⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jk_lb_worker.c

📁 精通tomcat书籍原代码,希望大家共同学习
💻 C
📖 第 1 页 / 共 3 页
字号:
                candidate = &p->lb_workers[i];
                curmin = mytraffic;
            }
        }
    }
    return candidate;
}

static worker_record_t *find_best_bybusyness(lb_worker_t *p,
                                             jk_logger_t *l)
{
    static unsigned int next_offset = 0;
    unsigned int i;
    unsigned int j;
    unsigned int offset;
    int bfn = 1;  /* Numerator of best busy factor */
    int bfd = 1;  /* Denominator of best busy factor */

    int left; /* left and right are used to compare rational numbers */
    int right;

    /* find the least busy worker */
    worker_record_t *candidate = NULL;

    offset = next_offset;

    /* First try to see if we have available candidate
	 */
    for (j = 0; j < p->num_of_workers; j++) {
        i = (j + offset) % p->num_of_workers;

        /* If the worker is in error state run
         * retry on that worker. It will be marked as
         * operational if the retry timeout is elapsed.
         * The worker might still be unusable, but we try
         * anyway.
         */
        if (JK_WORKER_IN_ERROR(p->lb_workers[i].s)) {
            retry_worker(&p->lb_workers[i], p->s->recover_wait_time, l);
        }
        /* Take into calculation only the workers that are
         * not in error state, stopped or not disabled.
         */
        if (JK_WORKER_USABLE(p->lb_workers[i].s)) {
            /* compare rational numbers: (a/b) < (c/d) iff a*d < c*b
			 */
            left  = p->lb_workers[i].s->busy * bfd;
            right = bfn * p->lb_workers[i].s->lb_factor;

            if (!candidate || (left < right)) {
                candidate = &p->lb_workers[i];
                bfn = p->lb_workers[i].s->busy;
                bfd = p->lb_workers[i].s->lb_factor;
                next_offset = i + 1;
            }
        }
    }
    return candidate;
}

static worker_record_t *find_bysession_route(lb_worker_t *p,
                                             const char *name,
                                             jk_logger_t *l)
{
    unsigned int i;
    int total_factor = 0;
    int uses_domain  = 0;
    worker_record_t *candidate = NULL;

    candidate = find_by_session(p, name, l);
    if (!candidate) {
        uses_domain = 1;
        candidate = find_best_bydomain(p, name, l);
    }
    if (candidate) {
        if (JK_WORKER_IN_ERROR(candidate->s)) {
            retry_worker(candidate, p->s->recover_wait_time, l);
        }
        if (candidate->s->in_error_state || candidate->s->is_stopped ) {
            /* We have a worker that is error state or stopped.
             * If it has a redirection set use that redirection worker.
             * This enables to safely remove the member from the
             * balancer. Of course you will need a some kind of
             * session replication between those two remote.
             */
            if (p->s->sticky_session_force)
                candidate = NULL;
            else if (*candidate->s->redirect)
                candidate = find_by_session(p, candidate->s->redirect, l);
            else if (*candidate->s->domain && !uses_domain) {
                uses_domain = 1;
                candidate = find_best_bydomain(p, candidate->s->domain, l);
            }
            if (candidate && (candidate->s->in_error_state || candidate->s->is_stopped))
                candidate = NULL;
        }
    }
    if (candidate && !uses_domain &&
        p->lbmethod == JK_LB_BYREQUESTS) {

        for (i = 0; i < p->num_of_workers; i++) {
            if (JK_WORKER_USABLE(p->lb_workers[i].s)) {
                /* Skip all workers that are not member of candidate domain */
                if (*candidate->s->domain &&
                    strcmp(p->lb_workers[i].s->domain, candidate->s->domain))
                    continue;
                p->lb_workers[i].s->lb_value += p->lb_workers[i].s->lb_factor;
                total_factor += p->lb_workers[i].s->lb_factor;
            }
        }
        candidate->s->lb_value -= total_factor;
    }
    return candidate;
}

static worker_record_t *find_failover_worker(lb_worker_t * p,
                                             jk_logger_t *l)
{
    worker_record_t *rc = NULL;
    unsigned int i;
    const char *redirect = NULL;

    for (i = 0; i < p->num_of_workers; i++) {
        if (strlen(p->lb_workers[i].s->redirect)) {
            redirect = &(p->lb_workers[i].s->redirect[0]);
            break;
        }
    }
    if (redirect)
        rc = find_bysession_route(p, redirect, l);
    return rc;
}

static worker_record_t *find_best_worker(lb_worker_t * p,
                                         jk_logger_t *l)
{
    worker_record_t *rc = NULL;

    if (p->lbmethod == JK_LB_BYREQUESTS)
        rc = find_best_byrequests(p, l);
    else if (p->lbmethod == JK_LB_BYTRAFFIC)
        rc = find_best_bytraffic(p, l);
    else if (p->lbmethod == JK_LB_BYBUSYNESS)
        rc = find_best_bybusyness(p, l);
    /* By default use worker name as session route */
    if (rc)
        rc->r = &(rc->s->name[0]);
    else
        rc = find_failover_worker(p, l);
    return rc;
}

static worker_record_t *get_most_suitable_worker(lb_worker_t * p,
                                                 jk_ws_service_t *s,
                                                 int attempt,
                                                 jk_logger_t *l)
{
    worker_record_t *rc = NULL;
    char *sessionid = NULL;
    int r;

    JK_TRACE_ENTER(l);
    if (p->num_of_workers == 1) {
        /* No need to find the best worker
         * if there is a single one
         */
        if (JK_WORKER_IN_ERROR(p->lb_workers[0].s)) {
            retry_worker(&p->lb_workers[0], p->s->recover_wait_time, l);
        }
        /* Check if worker is marked for retry */
        if(!p->lb_workers[0].s->in_error_state && !p->lb_workers[0].s->is_stopped) {
            p->lb_workers[0].r = &(p->lb_workers[0].s->name[0]);
            JK_TRACE_EXIT(l);
            return &p->lb_workers[0];
        }
        else {
            JK_TRACE_EXIT(l);
            return NULL;
        }
    }
    else if (p->s->sticky_session) {
        /* Use sessionid only if sticky_session is
         * defined for this load balancer
         */
        sessionid = get_sessionid(s);
    }
    if (p->lblock == JK_LB_LOCK_PESSIMISTIC)
        r = jk_shm_lock();
    else {
        JK_ENTER_CS(&(p->cs), r);
    }
    if (!r) {
       jk_log(l, JK_LOG_ERROR,
              "locking failed with errno=%d",
              errno);
        JK_TRACE_EXIT(l);
        return NULL;
    }
    if (sessionid) {
        char *session = sessionid;
        if (JK_IS_DEBUG_LEVEL(l)) {
            jk_log(l, JK_LOG_DEBUG,
                   "total sessionid is %s",
                    sessionid ? sessionid : "empty");
        }
        while (sessionid) {
            char *next = strchr(sessionid, ';');
            char *session_route = NULL;
            if (next)
               *next++ = '\0';
            if (JK_IS_DEBUG_LEVEL(l))
                jk_log(l, JK_LOG_DEBUG,
                       "searching worker for partial sessionid %s",
                       sessionid);
            session_route = strchr(sessionid, '.');
            if (session_route) {
                ++session_route;

                if (JK_IS_DEBUG_LEVEL(l))
                    jk_log(l, JK_LOG_DEBUG,
                           "searching worker for session route %s",
                            session_route);

                /* We have a session route. Whow! */
                rc = find_bysession_route(p, session_route, l);
                if (rc) {
                    if (p->lblock == JK_LB_LOCK_PESSIMISTIC)
                        jk_shm_unlock();
                    else {
                        JK_LEAVE_CS(&(p->cs), r);
                    }
                    if (JK_IS_DEBUG_LEVEL(l))
                        jk_log(l, JK_LOG_DEBUG,
                               "found worker %s for route %s and partial sessionid %s",
                               rc->s->name, session_route, sessionid);
                        JK_TRACE_EXIT(l);
                    return rc;
                }
            }
            /* Try next partial sessionid if present */
            sessionid = next;
            rc = NULL;
        }
        if (!rc && p->s->sticky_session_force) {
            if (p->lblock == JK_LB_LOCK_PESSIMISTIC)
                jk_shm_unlock();
            else {
                JK_LEAVE_CS(&(p->cs), r);
            }
            jk_log(l, JK_LOG_INFO,
                   "all workers are in error state for session %s",
                    session);
            JK_TRACE_EXIT(l);
            return NULL;
        }
    }
    rc = find_best_worker(p, l);
    if (p->lblock == JK_LB_LOCK_PESSIMISTIC)
        jk_shm_unlock();
    else {
        JK_LEAVE_CS(&(p->cs), r);
    }
    if (rc && JK_IS_DEBUG_LEVEL(l)) {
        jk_log(l, JK_LOG_DEBUG,
               "found best worker (%s) using %s method", rc->s->name,
               p->lbmethod == JK_LB_BYREQUESTS ? "by request" : "by traffic");
    }
    JK_TRACE_EXIT(l);
    return rc;
}

static int JK_METHOD service(jk_endpoint_t *e,
                             jk_ws_service_t *s,
                             jk_logger_t *l, int *is_error)
{
    JK_TRACE_ENTER(l);

    if (e && e->endpoint_private && s && is_error) {
        lb_endpoint_t *p = e->endpoint_private;
        int attempt = 0;
        int num_of_workers = p->worker->num_of_workers;
        worker_record_t *prec = NULL;
        /* Set returned error to OK */
        *is_error = JK_HTTP_OK;

        /* set the recovery post, for LB mode */
        s->reco_buf = jk_b_new(s->pool);
        jk_b_set_buffer_size(s->reco_buf, DEF_BUFFER_SZ);
        jk_b_reset(s->reco_buf);
        s->reco_status = RECO_INITED;
        if (JK_IS_DEBUG_LEVEL(l))
            jk_log(l, JK_LOG_DEBUG,
                   "service sticky_session=%d",
                   p->worker->s->sticky_session);

        while (num_of_workers) {
            worker_record_t *rec =
                get_most_suitable_worker(p->worker, s, attempt, l);
            int rc;
            /* Do not reuse previous worker, because
             * that worker already failed.
             */
            if (rec && rec != prec) {
                int is_service_error = JK_HTTP_OK;
                int service_stat = JK_FALSE;
                jk_endpoint_t *end = NULL;

                s->jvm_route = rec->r;
                rc = rec->w->get_endpoint(rec->w, &end, l);

                if (JK_IS_DEBUG_LEVEL(l))
                    jk_log(l, JK_LOG_DEBUG,
                           "service worker=%s jvm_route=%s",
                           rec->s->name, s->jvm_route);
                if (rc && end) {
                    size_t rd = 0;
                    size_t wr = 0;
                    /* Reset endpoint read and write sizes for
                     * this request.
                     */
                    end->rd = end->wr = 0;
                    if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
                        jk_shm_lock();

                    rec->s->elected++;
                    /* Increment the number of workers serving request */
                    p->worker->s->busy++;
                    if (p->worker->s->busy > p->worker->s->max_busy)
                        p->worker->s->max_busy = p->worker->s->busy;
                    rec->s->busy++;
                    if (rec->s->busy > rec->s->max_busy)
                        rec->s->max_busy = rec->s->busy;
                    if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
                        jk_shm_unlock();

                    service_stat = end->service(end, s, l, &is_service_error);
                    rd = end->rd;
                    wr = end->wr;
                    end->done(&end, l);

                    if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
                        jk_shm_lock();

                    /* Update partial reads and writes if any */
                    rec->s->readed += rd;
                    rec->s->transferred += wr;

                    /* When returning the endpoint mark the worker as not busy.
                     * We have at least one endpoint free
                     */
                    rec->s->is_busy = JK_FALSE;
                    /* Decrement the busy worker count.
                     * Check if the busy was reset to zero by graceful
                     * restart of the server.
                     */
                    if (rec->s->busy)
                        rec->s->busy--;
                    if (p->worker->s->busy)
                        p->worker->s->busy--;
                    if (service_stat == JK_TRUE) {
                        rec->s->in_error_state = JK_FALSE;
                        rec->s->in_recovering = JK_FALSE;
                        rec->s->error_time = 0;
                        if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
                            jk_shm_unlock();
                        JK_TRACE_EXIT(l);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -