📄 jk_lb_worker.c
字号:
candidate = &p->lb_workers[i];
curmin = mytraffic;
}
}
}
return candidate;
}
static worker_record_t *find_best_bybusyness(lb_worker_t *p,
jk_logger_t *l)
{
static unsigned int next_offset = 0;
unsigned int i;
unsigned int j;
unsigned int offset;
int bfn = 1; /* Numerator of best busy factor */
int bfd = 1; /* Denominator of best busy factor */
int left; /* left and right are used to compare rational numbers */
int right;
/* find the least busy worker */
worker_record_t *candidate = NULL;
offset = next_offset;
/* First try to see if we have available candidate
*/
for (j = 0; j < p->num_of_workers; j++) {
i = (j + offset) % p->num_of_workers;
/* If the worker is in error state run
* retry on that worker. It will be marked as
* operational if the retry timeout is elapsed.
* The worker might still be unusable, but we try
* anyway.
*/
if (JK_WORKER_IN_ERROR(p->lb_workers[i].s)) {
retry_worker(&p->lb_workers[i], p->s->recover_wait_time, l);
}
/* Take into calculation only the workers that are
* not in error state, stopped or not disabled.
*/
if (JK_WORKER_USABLE(p->lb_workers[i].s)) {
/* compare rational numbers: (a/b) < (c/d) iff a*d < c*b
*/
left = p->lb_workers[i].s->busy * bfd;
right = bfn * p->lb_workers[i].s->lb_factor;
if (!candidate || (left < right)) {
candidate = &p->lb_workers[i];
bfn = p->lb_workers[i].s->busy;
bfd = p->lb_workers[i].s->lb_factor;
next_offset = i + 1;
}
}
}
return candidate;
}
static worker_record_t *find_bysession_route(lb_worker_t *p,
const char *name,
jk_logger_t *l)
{
unsigned int i;
int total_factor = 0;
int uses_domain = 0;
worker_record_t *candidate = NULL;
candidate = find_by_session(p, name, l);
if (!candidate) {
uses_domain = 1;
candidate = find_best_bydomain(p, name, l);
}
if (candidate) {
if (JK_WORKER_IN_ERROR(candidate->s)) {
retry_worker(candidate, p->s->recover_wait_time, l);
}
if (candidate->s->in_error_state || candidate->s->is_stopped ) {
/* We have a worker that is error state or stopped.
* If it has a redirection set use that redirection worker.
* This enables to safely remove the member from the
* balancer. Of course you will need a some kind of
* session replication between those two remote.
*/
if (p->s->sticky_session_force)
candidate = NULL;
else if (*candidate->s->redirect)
candidate = find_by_session(p, candidate->s->redirect, l);
else if (*candidate->s->domain && !uses_domain) {
uses_domain = 1;
candidate = find_best_bydomain(p, candidate->s->domain, l);
}
if (candidate && (candidate->s->in_error_state || candidate->s->is_stopped))
candidate = NULL;
}
}
if (candidate && !uses_domain &&
p->lbmethod == JK_LB_BYREQUESTS) {
for (i = 0; i < p->num_of_workers; i++) {
if (JK_WORKER_USABLE(p->lb_workers[i].s)) {
/* Skip all workers that are not member of candidate domain */
if (*candidate->s->domain &&
strcmp(p->lb_workers[i].s->domain, candidate->s->domain))
continue;
p->lb_workers[i].s->lb_value += p->lb_workers[i].s->lb_factor;
total_factor += p->lb_workers[i].s->lb_factor;
}
}
candidate->s->lb_value -= total_factor;
}
return candidate;
}
static worker_record_t *find_failover_worker(lb_worker_t * p,
jk_logger_t *l)
{
worker_record_t *rc = NULL;
unsigned int i;
const char *redirect = NULL;
for (i = 0; i < p->num_of_workers; i++) {
if (strlen(p->lb_workers[i].s->redirect)) {
redirect = &(p->lb_workers[i].s->redirect[0]);
break;
}
}
if (redirect)
rc = find_bysession_route(p, redirect, l);
return rc;
}
static worker_record_t *find_best_worker(lb_worker_t * p,
jk_logger_t *l)
{
worker_record_t *rc = NULL;
if (p->lbmethod == JK_LB_BYREQUESTS)
rc = find_best_byrequests(p, l);
else if (p->lbmethod == JK_LB_BYTRAFFIC)
rc = find_best_bytraffic(p, l);
else if (p->lbmethod == JK_LB_BYBUSYNESS)
rc = find_best_bybusyness(p, l);
/* By default use worker name as session route */
if (rc)
rc->r = &(rc->s->name[0]);
else
rc = find_failover_worker(p, l);
return rc;
}
static worker_record_t *get_most_suitable_worker(lb_worker_t * p,
jk_ws_service_t *s,
int attempt,
jk_logger_t *l)
{
worker_record_t *rc = NULL;
char *sessionid = NULL;
int r;
JK_TRACE_ENTER(l);
if (p->num_of_workers == 1) {
/* No need to find the best worker
* if there is a single one
*/
if (JK_WORKER_IN_ERROR(p->lb_workers[0].s)) {
retry_worker(&p->lb_workers[0], p->s->recover_wait_time, l);
}
/* Check if worker is marked for retry */
if(!p->lb_workers[0].s->in_error_state && !p->lb_workers[0].s->is_stopped) {
p->lb_workers[0].r = &(p->lb_workers[0].s->name[0]);
JK_TRACE_EXIT(l);
return &p->lb_workers[0];
}
else {
JK_TRACE_EXIT(l);
return NULL;
}
}
else if (p->s->sticky_session) {
/* Use sessionid only if sticky_session is
* defined for this load balancer
*/
sessionid = get_sessionid(s);
}
if (p->lblock == JK_LB_LOCK_PESSIMISTIC)
r = jk_shm_lock();
else {
JK_ENTER_CS(&(p->cs), r);
}
if (!r) {
jk_log(l, JK_LOG_ERROR,
"locking failed with errno=%d",
errno);
JK_TRACE_EXIT(l);
return NULL;
}
if (sessionid) {
char *session = sessionid;
if (JK_IS_DEBUG_LEVEL(l)) {
jk_log(l, JK_LOG_DEBUG,
"total sessionid is %s",
sessionid ? sessionid : "empty");
}
while (sessionid) {
char *next = strchr(sessionid, ';');
char *session_route = NULL;
if (next)
*next++ = '\0';
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"searching worker for partial sessionid %s",
sessionid);
session_route = strchr(sessionid, '.');
if (session_route) {
++session_route;
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"searching worker for session route %s",
session_route);
/* We have a session route. Whow! */
rc = find_bysession_route(p, session_route, l);
if (rc) {
if (p->lblock == JK_LB_LOCK_PESSIMISTIC)
jk_shm_unlock();
else {
JK_LEAVE_CS(&(p->cs), r);
}
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"found worker %s for route %s and partial sessionid %s",
rc->s->name, session_route, sessionid);
JK_TRACE_EXIT(l);
return rc;
}
}
/* Try next partial sessionid if present */
sessionid = next;
rc = NULL;
}
if (!rc && p->s->sticky_session_force) {
if (p->lblock == JK_LB_LOCK_PESSIMISTIC)
jk_shm_unlock();
else {
JK_LEAVE_CS(&(p->cs), r);
}
jk_log(l, JK_LOG_INFO,
"all workers are in error state for session %s",
session);
JK_TRACE_EXIT(l);
return NULL;
}
}
rc = find_best_worker(p, l);
if (p->lblock == JK_LB_LOCK_PESSIMISTIC)
jk_shm_unlock();
else {
JK_LEAVE_CS(&(p->cs), r);
}
if (rc && JK_IS_DEBUG_LEVEL(l)) {
jk_log(l, JK_LOG_DEBUG,
"found best worker (%s) using %s method", rc->s->name,
p->lbmethod == JK_LB_BYREQUESTS ? "by request" : "by traffic");
}
JK_TRACE_EXIT(l);
return rc;
}
static int JK_METHOD service(jk_endpoint_t *e,
jk_ws_service_t *s,
jk_logger_t *l, int *is_error)
{
JK_TRACE_ENTER(l);
if (e && e->endpoint_private && s && is_error) {
lb_endpoint_t *p = e->endpoint_private;
int attempt = 0;
int num_of_workers = p->worker->num_of_workers;
worker_record_t *prec = NULL;
/* Set returned error to OK */
*is_error = JK_HTTP_OK;
/* set the recovery post, for LB mode */
s->reco_buf = jk_b_new(s->pool);
jk_b_set_buffer_size(s->reco_buf, DEF_BUFFER_SZ);
jk_b_reset(s->reco_buf);
s->reco_status = RECO_INITED;
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"service sticky_session=%d",
p->worker->s->sticky_session);
while (num_of_workers) {
worker_record_t *rec =
get_most_suitable_worker(p->worker, s, attempt, l);
int rc;
/* Do not reuse previous worker, because
* that worker already failed.
*/
if (rec && rec != prec) {
int is_service_error = JK_HTTP_OK;
int service_stat = JK_FALSE;
jk_endpoint_t *end = NULL;
s->jvm_route = rec->r;
rc = rec->w->get_endpoint(rec->w, &end, l);
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"service worker=%s jvm_route=%s",
rec->s->name, s->jvm_route);
if (rc && end) {
size_t rd = 0;
size_t wr = 0;
/* Reset endpoint read and write sizes for
* this request.
*/
end->rd = end->wr = 0;
if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
jk_shm_lock();
rec->s->elected++;
/* Increment the number of workers serving request */
p->worker->s->busy++;
if (p->worker->s->busy > p->worker->s->max_busy)
p->worker->s->max_busy = p->worker->s->busy;
rec->s->busy++;
if (rec->s->busy > rec->s->max_busy)
rec->s->max_busy = rec->s->busy;
if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
jk_shm_unlock();
service_stat = end->service(end, s, l, &is_service_error);
rd = end->rd;
wr = end->wr;
end->done(&end, l);
if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
jk_shm_lock();
/* Update partial reads and writes if any */
rec->s->readed += rd;
rec->s->transferred += wr;
/* When returning the endpoint mark the worker as not busy.
* We have at least one endpoint free
*/
rec->s->is_busy = JK_FALSE;
/* Decrement the busy worker count.
* Check if the busy was reset to zero by graceful
* restart of the server.
*/
if (rec->s->busy)
rec->s->busy--;
if (p->worker->s->busy)
p->worker->s->busy--;
if (service_stat == JK_TRUE) {
rec->s->in_error_state = JK_FALSE;
rec->s->in_recovering = JK_FALSE;
rec->s->error_time = 0;
if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
jk_shm_unlock();
JK_TRACE_EXIT(l);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -