📄 jk_lb_worker.c
字号:
* retry on that worker. It will be marked as * operational if the retry timeout is elapsed. * The worker might still be unusable, but we try * anyway. */ if (JK_WORKER_IN_ERROR(p->lb_workers[i].s)) { retry_worker(&p->lb_workers[i], p->s->recover_wait_time, l); } /* Take into calculation only the workers that are * not in error state, stopped or not disabled. */ if (JK_WORKER_USABLE(p->lb_workers[i].s)) { mytraffic = (p->lb_workers[i].s->transferred/p->lb_workers[i].s->lb_factor) + (p->lb_workers[i].s->readed/p->lb_workers[i].s->lb_factor); if (!candidate || mytraffic < curmin) { candidate = &p->lb_workers[i]; curmin = mytraffic; } } } if (p->lblock == JK_LB_LOCK_PESSIMISTIC) jk_shm_unlock(); return candidate;}static worker_record_t *find_bysession_route(lb_worker_t *p, const char *name, jk_logger_t *l){ unsigned int i; int total_factor = 0; int uses_domain = 0; worker_record_t *candidate = NULL; candidate = find_by_session(p, name, l); if (!candidate) { uses_domain = 1; candidate = find_best_bydomain(p, name, l); } if (candidate) { if (JK_WORKER_IN_ERROR(candidate->s)) { retry_worker(candidate, p->s->recover_wait_time, l); } if (candidate->s->in_error_state || candidate->s->is_stopped ) { /* We have a worker that is error state or stopped. * If it has a redirection set use that redirection worker. * This enables to safely remove the member from the * balancer. Of course you will need a some kind of * session replication between those two remote. */ if (p->s->sticky_session_force) candidate = NULL; else if (*candidate->s->redirect) candidate = find_by_session(p, candidate->s->redirect, l); else if (*candidate->s->domain && !uses_domain) { uses_domain = 1; candidate = find_best_bydomain(p, candidate->s->domain, l); } if (candidate && (candidate->s->in_error_state || candidate->s->is_stopped)) candidate = NULL; } } if (candidate && !uses_domain && p->lbmethod == JK_LB_BYREQUESTS) { if (p->lblock == JK_LB_LOCK_PESSIMISTIC) jk_shm_lock(); for (i = 0; i < p->num_of_workers; i++) { if (JK_WORKER_USABLE(p->lb_workers[i].s)) { /* Skip all workers that are not member of candidate domain */ if (*candidate->s->domain && strcmp(p->lb_workers[i].s->domain, candidate->s->domain)) continue; p->lb_workers[i].s->lb_value += p->lb_workers[i].s->lb_factor; total_factor += p->lb_workers[i].s->lb_factor; } } candidate->s->lb_value -= total_factor; if (p->lblock == JK_LB_LOCK_PESSIMISTIC) jk_shm_unlock(); } return candidate;}static worker_record_t *find_failover_worker(lb_worker_t * p, jk_logger_t *l){ worker_record_t *rc = NULL; unsigned int i; const char *redirect = NULL; for (i = 0; i < p->num_of_workers; i++) { if (strlen(p->lb_workers[i].s->redirect)) { redirect = &(p->lb_workers[i].s->redirect[0]); break; } } if (redirect) rc = find_bysession_route(p, redirect, l); return rc;}static worker_record_t *find_best_worker(lb_worker_t * p, jk_logger_t *l){ worker_record_t *rc = NULL; if (p->lbmethod == JK_LB_BYREQUESTS) rc = find_best_byrequests(p, l); else if (p->lbmethod == JK_LB_BYTRAFFIC) rc = find_best_bytraffic(p, l); /* By default use worker name as session route */ if (rc) rc->r = &(rc->s->name[0]); else rc = find_failover_worker(p, l); return rc;}static worker_record_t *get_most_suitable_worker(lb_worker_t * p, jk_ws_service_t *s, int attempt, jk_logger_t *l){ worker_record_t *rc = NULL; char *sessionid = NULL; int r; JK_TRACE_ENTER(l); if (p->num_of_workers == 1) { /* No need to find the best worker * if there is a single one */ if (JK_WORKER_IN_ERROR(p->lb_workers[0].s)) { retry_worker(&p->lb_workers[0], p->s->recover_wait_time, l); } /* Check if worker is marked for retry */ if(!p->lb_workers[0].s->in_error_state && !p->lb_workers[0].s->is_stopped) { p->lb_workers[0].r = &(p->lb_workers[0].s->name[0]); JK_TRACE_EXIT(l); return &p->lb_workers[0]; } else { JK_TRACE_EXIT(l); return NULL; } } else if (p->s->sticky_session) { /* Use sessionid only if sticky_session is * defined for this load balancer */ sessionid = get_sessionid(s); } JK_ENTER_CS(&(p->cs), r); if (!r) { jk_log(l, JK_LOG_ERROR, "locking thread with errno=%d", errno); JK_TRACE_EXIT(l); return NULL; } if (sessionid) { char *session = sessionid; if (JK_IS_DEBUG_LEVEL(l)) { jk_log(l, JK_LOG_DEBUG, "total sessionid is %s", sessionid ? sessionid : "empty"); } while (sessionid) { char *next = strchr(sessionid, ';'); char *session_route = NULL; if (next) *next++ = '\0'; if (JK_IS_DEBUG_LEVEL(l)) jk_log(l, JK_LOG_DEBUG, "searching worker for partial sessionid %s", sessionid); session_route = strchr(sessionid, '.'); if (session_route) { ++session_route; if (JK_IS_DEBUG_LEVEL(l)) jk_log(l, JK_LOG_DEBUG, "searching worker for session route %s", session_route); /* We have a session route. Whow! */ rc = find_bysession_route(p, session_route, l); if (rc) { JK_LEAVE_CS(&(p->cs), r); if (JK_IS_DEBUG_LEVEL(l)) jk_log(l, JK_LOG_DEBUG, "found worker %s for route %s and partial sessionid %s", rc->s->name, session_route, sessionid); JK_TRACE_EXIT(l); return rc; } } /* Try next partial sessionid if present */ sessionid = next; rc = NULL; } if (!rc && p->s->sticky_session_force) { JK_LEAVE_CS(&(p->cs), r); jk_log(l, JK_LOG_INFO, "all workers are in error state for session %s", session); JK_TRACE_EXIT(l); return NULL; } } rc = find_best_worker(p, l); JK_LEAVE_CS(&(p->cs), r); if (rc && JK_IS_DEBUG_LEVEL(l)) { jk_log(l, JK_LOG_DEBUG, "found best worker (%s) using %s method", rc->s->name, p->lbmethod == JK_LB_BYREQUESTS ? "by request" : "by traffic"); } JK_TRACE_EXIT(l); return rc;}static int JK_METHOD service(jk_endpoint_t *e, jk_ws_service_t *s, jk_logger_t *l, int *is_error){ JK_TRACE_ENTER(l); if (e && e->endpoint_private && s && is_error) { lb_endpoint_t *p = e->endpoint_private; int attempt = 0; int num_of_workers = p->worker->num_of_workers; worker_record_t *prec = NULL; /* Set returned error to OK */ *is_error = JK_HTTP_OK; /* set the recovery post, for LB mode */ s->reco_buf = jk_b_new(s->pool); jk_b_set_buffer_size(s->reco_buf, DEF_BUFFER_SZ); jk_b_reset(s->reco_buf); s->reco_status = RECO_INITED; if (JK_IS_DEBUG_LEVEL(l)) jk_log(l, JK_LOG_DEBUG, "service sticky_session=%d", p->worker->s->sticky_session); while (num_of_workers) { worker_record_t *rec = get_most_suitable_worker(p->worker, s, attempt++, l); int rc; /* Do not reuse previous worker, because * that worker already failed. */ if (rec && rec != prec) { int is_service_error = JK_HTTP_OK; int service_stat = JK_FALSE; jk_endpoint_t *end = NULL; s->jvm_route = rec->r; rc = rec->w->get_endpoint(rec->w, &end, l); if (JK_IS_DEBUG_LEVEL(l)) jk_log(l, JK_LOG_DEBUG, "service worker=%s jvm_route=%s", rec->s->name, s->jvm_route); rec->s->elected++; if (rc && end) { /* Reset endpoint read and write sizes for * this request. */ end->rd = end->wr = 0; /* Increment the number of workers serving request */ p->worker->s->busy++; if (p->worker->s->busy > p->worker->s->max_busy) p->worker->s->max_busy = p->worker->s->busy; rec->s->busy++; if (rec->s->busy > rec->s->max_busy) rec->s->max_busy = rec->s->busy; service_stat = end->service(end, s, l, &is_service_error); /* Update partial reads and writes if any */ rec->s->readed += end->rd; rec->s->transferred += end->wr; end->done(&end, l); /* When returning the endpoint mark the worker as not busy. * We have at least one endpoint free */ rec->s->is_busy = JK_FALSE; /* Decrement the busy worker count */ rec->s->busy--; p->worker->s->busy--; if (service_stat == JK_TRUE) { rec->s->in_error_state = JK_FALSE; rec->s->in_recovering = JK_FALSE; rec->s->error_time = 0; JK_TRACE_EXIT(l); return JK_TRUE; } } else { /* If we can not get the endpoint * mark the worker as busy rather then * as in error */ rec->s->is_busy = JK_TRUE; jk_log(l, JK_LOG_INFO, "could not get free endpoint for worker %s", rec->s->name); /* Decrement the worker count and try another worker */ --num_of_workers; prec = rec; continue; } if (service_stat == JK_FALSE) { /* * Service failed !!! * * Time for fault tolerance (if possible)... */ rec->s->errors++; rec->s->in_error_state = JK_TRUE; rec->s->in_recovering = JK_FALSE; rec->s->error_time = time(NULL);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -