jk_lb_worker.c
来自「以便Apache与其他服务进行整合 Mod_JK安装」· C语言 代码 · 共 1,498 行 · 第 1/4 页
C
1,498 行
jk_pool_realloc(s->pool, sz, result, osz); strcat(result, ";"); strcat(result, id_start); } } } } } } return result;}/* Retrieve session id from the cookie or the parameter * (parameter first) */static char *get_sessionid(jk_ws_service_t *s, jk_logger_t *l){ char *val; val = get_path_param(s, JK_PATH_SESSION_IDENTIFIER); if (!val) { val = get_cookie(s, JK_SESSION_IDENTIFIER); } if (val && !*val) { /* TODO: For now only log the empty sessions. * However we should probably return 400 * (BAD_REQUEST) in this case */ jk_log(l, JK_LOG_INFO, "Detected empty session identifier."); return NULL; } return val;}static void close_workers(lb_worker_t * p, int num_of_workers, jk_logger_t *l){ int i = 0; for (i = 0; i < num_of_workers; i++) { p->lb_workers[i].w->destroy(&(p->lb_workers[i].w), l); }}/* If the worker is in error state run * retry on that worker. It will be marked as * operational if the retry timeout is elapsed. * The worker might still be unusable, but we try * anyway. * If the worker is in ok state and got no requests * since the last global maintenance, we mark its * state as not available. * Return the number of workers not in error state. */static int recover_workers(lb_worker_t *p, jk_uint64_t curmax, time_t now, jk_logger_t *l){ unsigned int i; int non_error = 0; int elapsed; worker_record_t *w = NULL; JK_TRACE_ENTER(l); if (p->sequence != p->s->sequence) jk_lb_pull(p, l); for (i = 0; i < p->num_of_workers; i++) { w = &p->lb_workers[i]; if (w->s->state == JK_LB_STATE_ERROR) { elapsed = (int)difftime(now, w->s->error_time); if (elapsed <= p->s->recover_wait_time) { if (JK_IS_DEBUG_LEVEL(l)) jk_log(l, JK_LOG_DEBUG, "worker %s will recover in %d seconds", w->s->name, p->s->recover_wait_time - elapsed); } else { if (JK_IS_DEBUG_LEVEL(l)) jk_log(l, JK_LOG_DEBUG, "worker %s is marked for recovery", w->s->name); if (p->lbmethod != JK_LB_METHOD_BUSYNESS) w->s->lb_value = curmax; w->s->reply_timeouts = 0; w->s->state = JK_LB_STATE_RECOVER; non_error++; } } else { non_error++; if (w->s->state == JK_LB_STATE_OK && w->s->elected == w->s->elected_snapshot) w->s->state = JK_LB_STATE_IDLE; } w->s->elected_snapshot = w->s->elected; } JK_TRACE_EXIT(l); return non_error;}static int force_recovery(lb_worker_t *p, jk_logger_t *l){ unsigned int i; int forced = 0; worker_record_t *w = NULL; JK_TRACE_ENTER(l); for (i = 0; i < p->num_of_workers; i++) { w = &p->lb_workers[i]; if (w->s->state == JK_LB_STATE_ERROR) { if (JK_IS_DEBUG_LEVEL(l)) jk_log(l, JK_LOG_INFO, "worker %s is marked for recovery", w->s->name); w->s->state = JK_LB_STATE_FORCE; forced++; } } JK_TRACE_EXIT(l); return forced;}/* Divide old load values by the decay factor, * such that older values get less important * for the routing decisions. */static jk_uint64_t decay_load(lb_worker_t *p, time_t exponent, jk_logger_t *l){ unsigned int i; jk_uint64_t curmax = 0; JK_TRACE_ENTER(l); if (p->lbmethod != JK_LB_METHOD_BUSYNESS) { for (i = 0; i < p->num_of_workers; i++) { p->lb_workers[i].s->lb_value >>= exponent; if (p->lb_workers[i].s->lb_value > curmax) { curmax = p->lb_workers[i].s->lb_value; } p->lb_workers[i].s->reply_timeouts >>= exponent; } } JK_TRACE_EXIT(l); return curmax;}static int JK_METHOD maintain_workers(jk_worker_t *p, time_t now, jk_logger_t *l){ unsigned int i = 0; jk_uint64_t curmax = 0; long delta; JK_TRACE_ENTER(l); if (p && p->worker_private) { lb_worker_t *lb = (lb_worker_t *)p->worker_private; for (i = 0; i < lb->num_of_workers; i++) { if (lb->lb_workers[i].w->maintain) { lb->lb_workers[i].w->maintain(lb->lb_workers[i].w, now, l); } } jk_shm_lock(); /* Now we check for global maintenance (once for all processes). * Checking workers for recovery and applying decay to the * load values should not be done by each process individually. * Therefore we globally sync and we use a global timestamp. * Since it's possible that we come here a few milliseconds * before the interval has passed, we allow a little tolerance. */ delta = (long)difftime(now, lb->s->last_maintain_time) + JK_LB_MAINTAIN_TOLERANCE; if (delta >= lb->maintain_time) { lb->s->last_maintain_time = now; if (JK_IS_DEBUG_LEVEL(l)) jk_log(l, JK_LOG_DEBUG, "decay with 2^%d", JK_LB_DECAY_MULT * delta / lb->maintain_time); curmax = decay_load(lb, JK_LB_DECAY_MULT * delta / lb->maintain_time, l); if (!recover_workers(lb, curmax, now, l)) { force_recovery(lb, l); } } jk_shm_unlock(); } else { JK_LOG_NULL_PARAMS(l); } JK_TRACE_EXIT(l); return JK_TRUE;}static worker_record_t *find_by_session(lb_worker_t *p, const char *name, jk_logger_t *l){ worker_record_t *rc = NULL; unsigned int i; for (i = 0; i < p->num_of_workers; i++) { if (strcmp(p->lb_workers[i].s->route, name) == 0) { rc = &p->lb_workers[i]; rc->r = &(rc->s->route[0]); break; } } return rc;}static worker_record_t *find_best_bydomain(lb_worker_t *p, const char *domain, jk_logger_t *l){ unsigned int i; int d = 0; jk_uint64_t curmin = 0; worker_record_t *candidate = NULL; /* First try to see if we have available candidate */ for (i = 0; i < p->num_of_workers; i++) { /* Skip all workers that are not member of domain */ if (strlen(p->lb_workers[i].s->domain) == 0 || strcmp(p->lb_workers[i].s->domain, domain)) continue; /* Take into calculation only the workers that are * not in error state, stopped, disabled or busy. */ if (JK_WORKER_USABLE(p->lb_workers[i].s)) { if (!candidate || p->lb_workers[i].s->distance < d || (p->lb_workers[i].s->lb_value < curmin && p->lb_workers[i].s->distance == d)) { candidate = &p->lb_workers[i]; curmin = p->lb_workers[i].s->lb_value; d = p->lb_workers[i].s->distance; } } } if (candidate) { candidate->r = &(candidate->s->domain[0]); } return candidate;}static worker_record_t *find_best_byvalue(lb_worker_t *p, jk_logger_t *l){ static unsigned int next_offset = 0; unsigned int i; unsigned int j; unsigned int offset; int d = 0; jk_uint64_t curmin = 0; /* find the least busy worker */ worker_record_t *candidate = NULL; offset = next_offset; /* First try to see if we have available candidate */ for (j = offset; j < p->num_of_workers + offset; j++) { i = j % p->num_of_workers; /* Take into calculation only the workers that are * not in error state, stopped, disabled or busy. */ if (JK_WORKER_USABLE(p->lb_workers[i].s)) { if (!candidate || p->lb_workers[i].s->distance < d || (p->lb_workers[i].s->lb_value < curmin && p->lb_workers[i].s->distance == d)) { candidate = &p->lb_workers[i]; curmin = p->lb_workers[i].s->lb_value; d = p->lb_workers[i].s->distance; next_offset = i + 1; } } } return candidate;}static worker_record_t *find_bysession_route(lb_worker_t *p, const char *name, jk_logger_t *l){ int uses_domain = 0; worker_record_t *candidate = NULL; candidate = find_by_session(p, name, l); if (!candidate) { uses_domain = 1; candidate = find_best_bydomain(p, name, l); } if (candidate) { if (!JK_WORKER_USABLE_STICKY(candidate->s)) { /* We have a worker that is error state or stopped. * If it has a redirection set use that redirection worker. * This enables to safely remove the member from the * balancer. Of course you will need a some kind of * session replication between those two remote. */ if (p->sticky_session_force) candidate = NULL; else if (*candidate->s->redirect) candidate = find_by_session(p, candidate->s->redirect, l); else if (*candidate->s->domain && !uses_domain) { uses_domain = 1; candidate = find_best_bydomain(p, candidate->s->domain, l); } if (candidate && !JK_WORKER_USABLE_STICKY(candidate->s)) candidate = NULL; } } return candidate;}static worker_record_t *find_failover_worker(lb_worker_t * p, jk_logger_t *l){ worker_record_t *rc = NULL; unsigned int i; const char *redirect = NULL; for (i = 0; i < p->num_of_workers; i++) { if (strlen(p->lb_workers[i].s->redirect)) { redirect = &(p->lb_workers[i].s->redirect[0]); break; } } if (redirect) rc = find_bysession_route(p, redirect, l); return rc;}static worker_record_t *find_best_worker(lb_worker_t * p, jk_logger_t *l){ worker_record_t *rc = NULL; rc = find_best_byvalue(p, l); /* By default use worker route as session route */ if (rc) rc->r = &(rc->s->route[0]); else rc = find_failover_worker(p, l); return rc;}static worker_record_t *get_most_suitable_worker(lb_worker_t * p, char *sessionid, jk_ws_service_t *s, jk_logger_t *l){ worker_record_t *rc = NULL; int r; JK_TRACE_ENTER(l); if (p->num_of_workers == 1) { /* No need to find the best worker * if there is a single one */ if (JK_WORKER_USABLE_STICKY(p->lb_workers[0].s)) { if (p->lb_workers[0].s->activation != JK_LB_ACTIVATION_DISABLED) { p->lb_workers[0].r = &(p->lb_workers[0].s->route[0]);
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?