jk_lb_worker.c
来自「以便Apache与其他服务进行整合 Mod_JK安装」· C语言 代码 · 共 1,498 行 · 第 1/4 页
C
1,498 行
JK_TRACE_EXIT(l); return &p->lb_workers[0]; } } else { JK_TRACE_EXIT(l); return NULL; } } if (p->lblock == JK_LB_LOCK_PESSIMISTIC) r = jk_shm_lock(); else { JK_ENTER_CS(&(p->cs), r); } if (!r) { jk_log(l, JK_LOG_ERROR, "locking failed (errno=%d)", errno); } if (sessionid) { char *session = sessionid; while (sessionid) { char *next = strchr(sessionid, ';'); char *session_route = NULL; if (next) *next++ = '\0'; if (JK_IS_DEBUG_LEVEL(l)) jk_log(l, JK_LOG_DEBUG, "searching worker for partial sessionid %s", sessionid); session_route = strchr(sessionid, '.'); if (session_route) { ++session_route; if (JK_IS_DEBUG_LEVEL(l)) jk_log(l, JK_LOG_DEBUG, "searching worker for session route %s", session_route); /* We have a session route. Whow! */ rc = find_bysession_route(p, session_route, l); if (rc) { if (p->lblock == JK_LB_LOCK_PESSIMISTIC) jk_shm_unlock(); else { JK_LEAVE_CS(&(p->cs), r); } if (JK_IS_DEBUG_LEVEL(l)) jk_log(l, JK_LOG_DEBUG, "found worker %s (%s) for route %s and partial sessionid %s", rc->s->name, rc->s->route, session_route, sessionid); JK_TRACE_EXIT(l); return rc; } } /* Try next partial sessionid if present */ sessionid = next; rc = NULL; } if (!rc && p->sticky_session_force) { if (p->lblock == JK_LB_LOCK_PESSIMISTIC) jk_shm_unlock(); else { JK_LEAVE_CS(&(p->cs), r); } jk_log(l, JK_LOG_INFO, "all workers are in error state for session %s", session); JK_TRACE_EXIT(l); return NULL; } } rc = find_best_worker(p, l); if (p->lblock == JK_LB_LOCK_PESSIMISTIC) jk_shm_unlock(); else { JK_LEAVE_CS(&(p->cs), r); } if (rc && JK_IS_DEBUG_LEVEL(l)) { jk_log(l, JK_LOG_DEBUG, "found best worker %s (%s) using method '%s'", rc->s->name, rc->s->route, jk_lb_get_method(p, l)); } JK_TRACE_EXIT(l); return rc;}static void lb_add_log_items(jk_ws_service_t *s, const char *const *log_names, worker_record_t *w, jk_logger_t *l){ const char **log_values = jk_pool_alloc(s->pool, sizeof(char *) * JK_LB_NOTES_COUNT); char *buf = jk_pool_alloc(s->pool, sizeof(char *) * JK_LB_NOTES_COUNT * JK_LB_UINT64_STR_SZ); if (log_values && buf) { /* JK_NOTE_LB_FIRST/LAST_NAME */ log_values[0] = w->s->name; snprintf(buf, JK_LB_UINT64_STR_SZ, "%" JK_UINT64_T_FMT, w->s->lb_value); /* JK_NOTE_LB_FIRST/LAST_VALUE */ log_values[1] = buf; buf += JK_LB_UINT64_STR_SZ; snprintf(buf, JK_LB_UINT64_STR_SZ, "%" JK_UINT64_T_FMT, w->s->elected); /* JK_NOTE_LB_FIRST/LAST_ACCESSED */ log_values[2] = buf; buf += JK_LB_UINT64_STR_SZ; snprintf(buf, JK_LB_UINT64_STR_SZ, "%" JK_UINT64_T_FMT, w->s->readed); /* JK_NOTE_LB_FIRST/LAST_READ */ log_values[3] = buf; buf += JK_LB_UINT64_STR_SZ; snprintf(buf, JK_LB_UINT64_STR_SZ, "%" JK_UINT64_T_FMT, w->s->transferred); /* JK_NOTE_LB_FIRST/LAST_TRANSFERRED */ log_values[4] = buf; buf += JK_LB_UINT64_STR_SZ; snprintf(buf, JK_LB_UINT64_STR_SZ, "%" JK_UINT32_T_FMT, w->s->errors); /* JK_NOTE_LB_FIRST/LAST_ERRORS */ log_values[5] = buf; buf += JK_LB_UINT64_STR_SZ; snprintf(buf, JK_LB_UINT64_STR_SZ, "%d", w->s->busy); /* JK_NOTE_LB_FIRST/LAST_BUSY */ log_values[6] = buf; /* JK_NOTE_LB_FIRST/LAST_ACTIVATION */ log_values[7] = jk_lb_get_activation(w, l); /* JK_NOTE_LB_FIRST/LAST_STATE */ log_values[8] = jk_lb_get_state(w, l); s->add_log_items(s, log_names, log_values, JK_LB_NOTES_COUNT); }}static int JK_METHOD service(jk_endpoint_t *e, jk_ws_service_t *s, jk_logger_t *l, int *is_error){ lb_endpoint_t *p; int attempt = 1; worker_record_t *prec = NULL; int num_of_workers; int first = 1; int was_forced = 0; int rc = -1; char *sessionid = NULL; JK_TRACE_ENTER(l); if (!e || !e->endpoint_private || !s || !is_error) { JK_LOG_NULL_PARAMS(l); if (is_error) *is_error = JK_HTTP_SERVER_ERROR; JK_TRACE_EXIT(l); return JK_FALSE; } p = e->endpoint_private; num_of_workers = p->worker->num_of_workers; /* Set returned error to OK */ *is_error = JK_HTTP_OK; /* set the recovery post, for LB mode */ s->reco_buf = jk_b_new(s->pool); if (!s->reco_buf) { *is_error = JK_HTTP_SERVER_ERROR; jk_log(l, JK_LOG_ERROR, "Failed allocating AJP message"); JK_TRACE_EXIT(l); return JK_SERVER_ERROR; } if (jk_b_set_buffer_size(s->reco_buf, p->worker->max_packet_size)) { *is_error = JK_HTTP_SERVER_ERROR; jk_log(l, JK_LOG_ERROR, "Failed allocating AJP message buffer"); JK_TRACE_EXIT(l); return JK_SERVER_ERROR; } jk_b_reset(s->reco_buf); s->reco_status = RECO_INITED; jk_shm_lock(); if (p->worker->sequence != p->worker->s->sequence) jk_lb_pull(p->worker, l); jk_shm_unlock(); if (p->worker->sticky_session) { /* Use sessionid only if sticky_session is * defined for this load balancer */ sessionid = get_sessionid(s, l); } if (JK_IS_DEBUG_LEVEL(l)) jk_log(l, JK_LOG_DEBUG, "service sticky_session=%d id='%s'", p->worker->sticky_session, sessionid ? sessionid : "empty"); while (attempt <= num_of_workers && rc == -1) { worker_record_t *rec = get_most_suitable_worker(p->worker, sessionid, s, l); /* Do not reuse previous worker, because * that worker already failed. */ if (rec) { int r; int is_service_error = JK_HTTP_OK; jk_endpoint_t *end = NULL; int retry = 0; int retry_wait = JK_LB_MIN_RETRY_WAIT; s->route = rec->r; prec = rec; if (JK_IS_DEBUG_LEVEL(l)) jk_log(l, JK_LOG_DEBUG, "service worker=%s route=%s", rec->s->name, s->route); if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC) jk_shm_lock(); if (rec->s->state == JK_LB_STATE_RECOVER) rec->s->state = JK_LB_STATE_PROBE; if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC) jk_shm_unlock(); while ((!(r=rec->w->get_endpoint(rec->w, &end, l)) || !end) && (retry < p->worker->s->retries)) { retry++; retry_wait *=2; if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC) jk_shm_lock(); if (retry_wait > JK_LB_MAX_RETRY_WAIT) retry_wait = JK_LB_MAX_RETRY_WAIT; if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC) jk_shm_unlock(); if (JK_IS_DEBUG_LEVEL(l)) jk_log(l, JK_LOG_DEBUG, "could not get free endpoint for worker" " (retry %d, sleeping for %d ms)", retry, retry_wait); jk_sleep(retry_wait); } if (!r || !end) { /* If we can not get the endpoint * mark the worker as busy rather then * as in error if the retry number is * greater then the number of retries. */ if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC) jk_shm_lock(); if (rec->s->state != JK_LB_STATE_ERROR) rec->s->state = JK_LB_STATE_BUSY; if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC) jk_shm_unlock(); jk_log(l, JK_LOG_INFO, "could not get free endpoint for worker %s (%d retries)", rec->s->name, retry); } else { int service_stat = -1; jk_uint64_t rd = 0; jk_uint64_t wr = 0; /* Reset endpoint read and write sizes for * this request. */ end->rd = end->wr = 0; if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC) jk_shm_lock(); rec->s->elected++; /* Increment the number of workers serving request */ p->worker->s->busy++; if (p->worker->s->busy > p->worker->s->max_busy) p->worker->s->max_busy = p->worker->s->busy; rec->s->busy++; if (rec->s->busy > rec->s->max_busy) rec->s->max_busy = rec->s->busy; if ( (p->worker->lbmethod == JK_LB_METHOD_REQUESTS) || (p->worker->lbmethod == JK_LB_METHOD_BUSYNESS) || (p->worker->lbmethod == JK_LB_METHOD_SESSIONS && !sessionid) ) rec->s->lb_value += rec->s->lb_mult; if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC) jk_shm_unlock(); service_stat = end->service(end, s, l, &is_service_error); rd = end->rd; wr = end->wr; end->done(&end, l); if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC) jk_shm_lock(); /* Update partial reads and writes if any */ rec->s->readed += rd; rec->s->transferred += wr; if (p->worker->lbmethod == JK_LB_METHOD_TRAFFIC) { rec->s->lb_value += (rd+wr)*rec->s->lb_mult; } else if (p->worker->lbmethod == JK_LB_METHOD_BUSYNESS) { if (rec->s->lb_value >= rec->s->lb_mult) { rec->s->lb_value -= rec->s->lb_mult; } else { rec->s->lb_value = 0; if (JK_IS_DEBUG_LEVEL(l)) { jk_log(l, JK_LOG_DEBUG, "worker %s has load value to low (%" JK_UINT64_T_FMT " < %" JK_UINT64_T_FMT ") ", "- correcting to 0", rec->s->name, rec->s->lb_value, rec->s->lb_mult); } } } /* When returning the endpoint mark the worker as not busy. * We have at least one endpoint free */ if (rec->s->state == JK_LB_STATE_BUSY) rec->s->state = JK_LB_STATE_OK; /* Decrement the busy worker count. * Check if the busy was reset to zero by graceful * restart of the server. */ if (rec->s->busy) rec->s->busy--; if (p->worker->s->busy) p->worker->s->busy--; if (service_stat == JK_TRUE) { rec->s->state = JK_LB_STATE_OK; rec->s->error_time = 0; rc = JK_TRUE; } else if (service_stat == JK_CLIENT_ERROR) { /* * Client error !!! * Since this is bad request do not fail over. */ rec->s->client_errors++; rec->s->state = JK_LB_STATE_OK; rec->s->error_time = 0; jk_log(l, JK_LOG_INFO, "unrecoverable error %d, request failed." " Client failed in the middle of request," " we can't recover to another instance.", is_service_error); *is_error = is_service_error; rc = JK_CLIENT_ERROR; } else { if (is_service_error != JK_HTTP_SERVER_BUSY) { /* * Error is not recoverable - break with an error. */ jk_log(l, JK_LOG_ERROR, "unrecoverable error %d, request failed." " Tomcat failed in the middle of request," " we can't recover to another instance.", is_service_error); *is_error = is_service_error; rc = JK_FALSE; } if (service_stat == JK_REPLY_TIMEOUT) { rec->s->reply_timeouts++; } if (service_stat != JK_STATUS_ERROR && (service_stat != JK_REPLY_TIMEOUT || rec->s->reply_timeouts > (unsigned)p->worker->s->max_reply_timeouts)) { /* * Service failed !!! * Time for fault tolerance (if possible)... */ rec->s->errors++;
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?