📄 jk_lb_worker.c
字号:
/*
* Copyright 1999-2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/***************************************************************************
* Description: Load balancer worker, knows how to load balance among *
* several workers. *
* Author: Gal Shachor <shachor@il.ibm.com> *
* Author: Mladen Turk <mturk@apache.org> *
* Based on: *
* Version: $Revision: 387178 $ *
***************************************************************************/
#include "jk_pool.h"
#include "jk_service.h"
#include "jk_util.h"
#include "jk_worker.h"
#include "jk_lb_worker.h"
#include "jk_ajp13.h"
#include "jk_ajp13_worker.h"
#include "jk_ajp14_worker.h"
#include "jk_mt.h"
#include "jk_shm.h"
/*
* The load balancing code in this
*/
/*
* Time to wait before retry...
*/
#define JK_WORKER_IN_ERROR(w) ((w)->in_error_state && !(w)->is_busy)
#define JK_WORKER_USABLE(w) (!(w)->in_error_state && !(w)->is_stopped && !(w)->is_disabled && !(w)->is_busy)
struct lb_endpoint
{
jk_endpoint_t *e;
lb_worker_t *worker;
jk_endpoint_t endpoint;
};
typedef struct lb_endpoint lb_endpoint_t;
/* Retrieve the parameter with the given name */
static char *get_path_param(jk_ws_service_t *s, const char *name)
{
char *id_start = NULL;
for (id_start = strstr(s->req_uri, name);
id_start; id_start = strstr(id_start + 1, name)) {
if (id_start[strlen(name)] == '=') {
/*
* Session path-cookie was found, get it's value
*/
id_start += (1 + strlen(name));
if (strlen(id_start)) {
char *id_end;
id_start = jk_pool_strdup(s->pool, id_start);
/*
* The query string is not part of req_uri, however
* to be on the safe side lets remove the trailing query
* string if appended...
*/
if ((id_end = strchr(id_start, '?')) != NULL) {
*id_end = '\0';
}
/*
* Remove any trailing path element.
*/
if ((id_end = strchr(id_start, ';')) != NULL) {
*id_end = '\0';
}
return id_start;
}
}
}
return NULL;
}
/* Retrieve the cookie with the given name */
static char *get_cookie(jk_ws_service_t *s, const char *name)
{
unsigned i;
char *result = NULL;
for (i = 0; i < s->num_headers; i++) {
if (strcasecmp(s->headers_names[i], "cookie") == 0) {
char *id_start;
for (id_start = strstr(s->headers_values[i], name);
id_start; id_start = strstr(id_start + 1, name)) {
if (id_start == s->headers_values[i] ||
id_start[-1] == ';' ||
id_start[-1] == ',' || isspace(id_start[-1])) {
id_start += strlen(name);
while (*id_start && isspace(*id_start))
++id_start;
if (*id_start == '=' && id_start[1]) {
/*
* Session cookie was found, get it's value
*/
char *id_end;
++id_start;
id_start = jk_pool_strdup(s->pool, id_start);
if ((id_end = strchr(id_start, ';')) != NULL) {
*id_end = '\0';
}
if ((id_end = strchr(id_start, ',')) != NULL) {
*id_end = '\0';
}
if (result == NULL) {
result = id_start;
}
else {
size_t osz = strlen(result) + 1;
size_t sz = osz + strlen(id_start) + 1;
result =
jk_pool_realloc(s->pool, sz, result, osz);
strcat(result, ";");
strcat(result, id_start);
}
}
}
}
}
}
return result;
}
/* Retrieve session id from the cookie or the parameter */
/* (parameter first) */
static char *get_sessionid(jk_ws_service_t *s)
{
char *val;
val = get_path_param(s, JK_PATH_SESSION_IDENTIFIER);
if (!val) {
val = get_cookie(s, JK_SESSION_IDENTIFIER);
}
return val;
}
static void close_workers(lb_worker_t * p, int num_of_workers, jk_logger_t *l)
{
int i = 0;
for (i = 0; i < num_of_workers; i++) {
p->lb_workers[i].w->destroy(&(p->lb_workers[i].w), l);
}
}
static int JK_METHOD maintain_workers(jk_worker_t *p, jk_logger_t *l)
{
unsigned int i = 0;
lb_worker_t *lb = (lb_worker_t *)p->worker_private;
for (i = 0; i < lb->num_of_workers; i++) {
if (lb->lb_workers[i].w->maintain) {
lb->lb_workers[i].w->maintain(lb->lb_workers[i].w, l);
}
}
return JK_TRUE;
}
static void retry_worker(worker_record_t *w,
int recover_wait_time,
jk_logger_t *l)
{
int elapsed = (int)difftime(time(NULL), w->s->error_time);
JK_TRACE_ENTER(l);
if (elapsed <= recover_wait_time) {
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"worker %s will recover in %d seconds",
w->s->name, recover_wait_time - elapsed);
}
else {
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"worker %s is marked for recover",
w->s->name);
w->s->in_recovering = JK_TRUE;
w->s->in_error_state = JK_FALSE;
w->s->is_busy = JK_FALSE;
}
JK_TRACE_EXIT(l);
}
static worker_record_t *find_by_session(lb_worker_t *p,
const char *name,
jk_logger_t *l)
{
worker_record_t *rc = NULL;
unsigned int i;
for (i = 0; i < p->num_of_workers; i++) {
if (strcmp(p->lb_workers[i].s->name, name) == 0) {
rc = &p->lb_workers[i];
rc->r = &(rc->s->name[0]);
break;
}
}
return rc;
}
static worker_record_t *find_best_bydomain(lb_worker_t *p,
const char *domain,
jk_logger_t *l)
{
unsigned int i;
int total_factor = 0;
jk_u64_t mytraffic = 0;
jk_u64_t curmin = 0;
int bfn = 1;
int bfd = 1;
worker_record_t *candidate = NULL;
if (p->lbmethod == JK_LB_BYTRAFFIC) {
double diff;
time_t now = time(NULL);
/* Update transfer rate for each worker */
for (i = 0; i < p->num_of_workers; i++) {
diff = difftime(now, p->lb_workers[i].s->service_time);
if (diff > JK_SERVICE_TRANSFER_INTERVAL) {
p->lb_workers[i].s->service_time = now;
p->lb_workers[i].s->readed /= JK_SERVICE_TRANSFER_INTERVAL;
p->lb_workers[i].s->transferred /= JK_SERVICE_TRANSFER_INTERVAL;
}
}
}
/* First try to see if we have available candidate */
for (i = 0; i < p->num_of_workers; i++) {
/* Skip all workers that are not member of domain */
if (strlen(p->lb_workers[i].s->domain) == 0 ||
strcmp(p->lb_workers[i].s->domain, domain))
continue;
/* Take into calculation only the workers that are
* not in error state, stopped or not disabled.
*/
if (JK_WORKER_USABLE(p->lb_workers[i].s)) {
if (p->lbmethod == JK_LB_BYREQUESTS) {
p->lb_workers[i].s->lb_value += p->lb_workers[i].s->lb_factor;
total_factor += p->lb_workers[i].s->lb_factor;
if (!candidate || p->lb_workers[i].s->lb_value > candidate->s->lb_value)
candidate = &p->lb_workers[i];
}
else if (p->lbmethod == JK_LB_BYTRAFFIC) {
mytraffic = (p->lb_workers[i].s->transferred +
p->lb_workers[i].s->readed ) / p->lb_workers[i].s->lb_factor;
if (!candidate || mytraffic < curmin) {
candidate = &p->lb_workers[i];
curmin = mytraffic;
}
}
else {
/* compare rational numbers: (a/b) < (c/d) iff a*d < c*b
*/
int left = p->lb_workers[i].s->busy * bfd;
int right = bfn * p->lb_workers[i].s->lb_factor;
if (!candidate || (left < right)) {
candidate = &p->lb_workers[i];
bfn = p->lb_workers[i].s->busy;
bfd = p->lb_workers[i].s->lb_factor;
}
}
}
}
if (candidate) {
if (p->lbmethod == JK_LB_BYREQUESTS)
candidate->s->lb_value -= total_factor;
candidate->r = &(candidate->s->domain[0]);
}
return candidate;
}
static worker_record_t *find_best_byrequests(lb_worker_t *p,
jk_logger_t *l)
{
unsigned int i;
int total_factor = 0;
worker_record_t *candidate = NULL;
/* First try to see if we have available candidate */
for (i = 0; i < p->num_of_workers; i++) {
/* If the worker is in error state run
* retry on that worker. It will be marked as
* operational if the retry timeout is elapsed.
* The worker might still be unusable, but we try
* anyway.
*/
if (JK_WORKER_IN_ERROR(p->lb_workers[i].s)) {
retry_worker(&p->lb_workers[i], p->s->recover_wait_time, l);
}
/* Take into calculation only the workers that are
* not in error state, stopped or not disabled.
*/
if (JK_WORKER_USABLE(p->lb_workers[i].s)) {
p->lb_workers[i].s->lb_value += p->lb_workers[i].s->lb_factor;
total_factor += p->lb_workers[i].s->lb_factor;
if (!candidate || p->lb_workers[i].s->lb_value > candidate->s->lb_value)
candidate = &p->lb_workers[i];
}
}
if (candidate)
candidate->s->lb_value -= total_factor;
return candidate;
}
static worker_record_t *find_best_bytraffic(lb_worker_t *p,
jk_logger_t *l)
{
unsigned int i;
jk_u64_t mytraffic = 0;
jk_u64_t curmin = 0;
worker_record_t *candidate = NULL;
double diff;
time_t now = time(NULL);
for (i = 0; i < p->num_of_workers; i++) {
diff = difftime(now, p->lb_workers[i].s->service_time);
if (diff > JK_SERVICE_TRANSFER_INTERVAL) {
p->lb_workers[i].s->service_time = now;
p->lb_workers[i].s->readed /= JK_SERVICE_TRANSFER_INTERVAL;
p->lb_workers[i].s->transferred /= JK_SERVICE_TRANSFER_INTERVAL;
}
}
/* First try to see if we have available candidate */
for (i = 0; i < p->num_of_workers; i++) {
/* If the worker is in error state run
* retry on that worker. It will be marked as
* operational if the retry timeout is elapsed.
* The worker might still be unusable, but we try
* anyway.
*/
if (JK_WORKER_IN_ERROR(p->lb_workers[i].s)) {
retry_worker(&p->lb_workers[i], p->s->recover_wait_time, l);
}
/* Take into calculation only the workers that are
* not in error state, stopped or not disabled.
*/
if (JK_WORKER_USABLE(p->lb_workers[i].s)) {
mytraffic = (p->lb_workers[i].s->transferred/p->lb_workers[i].s->lb_factor) +
(p->lb_workers[i].s->readed/p->lb_workers[i].s->lb_factor);
if (!candidate || mytraffic < curmin) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -