📄 jk_worker_lb.c
字号:
env->l->jkLog(env, env->l, JK_LOG_ERROR, "lb.service() NullPointerException\n"); return JK_ERR; } /* you can not recover on another load balancer */ s->realWorker=NULL; /* Check for configuration updates */ if( wEnv->shm != NULL && wEnv->shm->head != NULL ) { /* We have shm, let's check for updates. This is just checking one memory location, no lock involved. It is possible we'll read it while somebody else is changing - but that's ok, we just check for equality. */ /* We should check this periodically */ if( wEnv->config->ver != wEnv->shm->head->lbVer ) { wEnv->config->update(env, wEnv->config, NULL ); wEnv->config->ver = wEnv->shm->head->lbVer; } } while(1) { int rc; /* Since there is potential worker state change * make the find best worker call thread safe */ if (lb_priv->cs != NULL) lb_priv->cs->lock(env, lb_priv->cs); rec=jk2_get_most_suitable_worker(env, lb, s, attempt); if (lb_priv->cs != NULL) lb_priv->cs->unLock(env, lb_priv->cs); attempt++; s->is_recoverable_error = JK_FALSE; if(rec == NULL) { /* NULL record, no more workers left ... */ env->l->jkLog(env, env->l, JK_LOG_ERROR, "lb_worker.service() all workers in error or disabled state\n"); /* set hwBalanceErr status */ if( lb->hwBalanceErr != 0 ) { s->status=lb->hwBalanceErr; } else { s->status=lb->noWorkerCode; /* SERVICE_UNAVAILABLE is the default */ } if( s->status == 302 ) { s->headers_out->put(env, s->headers_out, "Location", lb->noWorkerMsg, NULL); s->head(env, s ); } else { s->headers_out->put(env, s->headers_out, "Content-Type", "text/html", NULL); s->head(env, s ); s->jkprintf(env, s, lb->noWorkerMsg ); } s->afterRequest( env, s); lb_priv->error_time = time(NULL); return JK_ERR; } if( lb->mbean->debug > 0 ) env->l->jkLog(env, env->l, JK_LOG_DEBUG, "lb.service() try %s\n", rec->mbean->name ); if( rec->route==NULL ) { rec->route=rec->mbean->localName; } s->jvm_route = rec->route; s->realWorker = rec; rc = rec->service(env, rec, s); if(rc==JK_OK) { rec->in_error_state = JK_FALSE; rec->error_time = 0; return JK_OK; } /* If this is a browser connection error dont't check other * workers. */ if (rc == JK_HANDLER_ERROR) { rec->in_error_state = JK_FALSE; rec->error_time = 0; return JK_HANDLER_ERROR; } env->l->jkLog(env, env->l, JK_LOG_ERROR, "lb.service() worker failed %d for %s\n", rc, rec->mbean->name ); /* * Service failed !!! * * Time for fault tolerance (if possible)... */ rec->in_error_state = JK_TRUE; rec->error_time = time(NULL); if(!s->is_recoverable_error) { /* Error is not recoverable - break with an error. */ env->l->jkLog(env, env->l, JK_LOG_ERROR, "lb.service() unrecoverable error...\n"); break; } /* * Error is recoverable by submitting the request to * another worker... Lets try to do that. */ if( lb->mbean->debug > 0 ) { env->l->jkLog(env, env->l, JK_LOG_DEBUG, "lb_worker.service() try other hosts\n"); } } return JK_ERR;}/** Init internal structures. Called any time the config changes*/static int JK_METHOD jk2_lb_refresh(jk_env_t *env, jk_worker_t *lb){ int currentWorker=0; int i; int num_of_workers=lb->lbWorkerMap->size( env, lb->lbWorkerMap); for(i = 0 ; i < num_of_workers ; i++) { char *name = lb->lbWorkerMap->nameAt( env, lb->lbWorkerMap, i); jk_worker_t *w= env->getByName( env, name ); int level=0; int pos=0; if( w== NULL ) { env->l->jkLog(env, env->l, JK_LOG_ERROR, "lb_worker.init(): no worker found %s\n", name); continue; } if( w->mbean->disabled ) continue; level=w->level; /* It's like disabled */ if( level >= JK_LB_LEVELS ) continue; pos=lb->workerCnt[level]++; lb->workerTables[level][pos]=w; w->lb_value = w->lb_factor; w->in_error_state = JK_FALSE; } return JK_OK;}static char *jk2_worker_lb_multiValueInfo[]={"worker", NULL };static char *jk2_worker_lb_setAttributeInfo[]={"hwBalanceErr", "noWorkerMsg", "noWorkerCode", NULL };static int JK_METHOD jk2_lb_setAttribute(jk_env_t *env, jk_bean_t *mbean, char *name, void *valueP){ jk_worker_t *lb=mbean->object; char *value=valueP; unsigned i = 0; jk_worker_lb_private_t *lb_priv = lb->worker_private; if( strcmp( name, "worker") == 0 ) { if( lb->lbWorkerMap->get( env, lb->lbWorkerMap, name) != NULL ) { /* Already added */ return JK_OK; } value = lb->mbean->pool->pstrdup(env, lb->mbean->pool, value); lb->lbWorkerMap->add(env, lb->lbWorkerMap, value, ""); if( lb->mbean->debug > 0 ) env->l->jkLog(env, env->l, JK_LOG_DEBUG, "lb_worker.setAttribute(): Adding to %s: %s\n", lb->mbean->localName, value); jk2_lb_refresh( env, lb ); return JK_OK; } else if( strcmp( name, "noWorkerMsg") == 0 ) { lb->noWorkerMsg=value; } else if( strcmp( name, "noWorkerCode") == 0 ) { lb->noWorkerCode=atoi( value ); } else if( strcmp( name, "hwBalanceErr") == 0 ) { lb->hwBalanceErr=atoi( value ); } else if( strcmp( name, "timeout") == 0 ) { lb_priv->timeout=atoi( value ); } else if( strcmp( name, "recovery") == 0 ) { lb_priv->recovery=atoi( value ); } else if( strcmp( name, "stickySession") == 0 ) { lb_priv->sticky_session=atoi( value ); } else if( strcmp( name, "attempts") == 0 ) { lb_priv->attempts=atoi( value ); } return JK_ERR;}static int JK_METHOD jk2_lb_init(jk_env_t *env, jk_bean_t *bean){ jk_worker_t *lb=bean->object; int err; int i = 0; int num_of_workers=lb->lbWorkerMap->size( env, lb->lbWorkerMap); err=jk2_lb_refresh(env, lb ); if( err != JK_OK ) return err; /* if( lb->workerEnv->shm != NULL && lb->workerEnv->shm->head != NULL) */ /* jk2_lb_updateWorkers(env, lb, lb->workerEnv->shm); */ if( lb->mbean->debug > 0 ) env->l->jkLog(env, env->l, JK_LOG_DEBUG, "lb.init() %s %d workers\n", lb->mbean->name, num_of_workers ); return JK_OK;}static int JK_METHOD jk2_lb_destroy(jk_env_t *env, jk_bean_t *bean){ jk_worker_t *w=bean->object; /* Workers are destroyed by the workerEnv. It is possible that a worker is part of more than a lb. Nothing to clean up so far. */ return JK_OK;}int JK_METHOD jk2_worker_lb_factory(jk_env_t *env,jk_pool_t *pool, jk_bean_t *result, char *type, char *name){ jk_worker_t *w; int i; jk_bean_t *jkb; jk_worker_lb_private_t *worker_private; if(NULL == name ) { env->l->jkLog(env, env->l, JK_LOG_ERROR, "lb_worker.factory() NullPointerException\n"); return JK_ERR; } w = (jk_worker_t *)pool->calloc(env, pool, sizeof(jk_worker_t)); if(w==NULL) { env->l->jkLog(env, env->l, JK_LOG_ERROR, "lb_worker.factory() OutOfMemoryException\n"); return JK_ERR; } jkb=env->createBean2(env, pool,"threadMutex", NULL); if( jkb != NULL ) { w->cs=jkb->object; jkb->init(env, jkb ); } worker_private = (jk_worker_lb_private_t *)pool->calloc(env, pool, sizeof(jk_worker_lb_private_t)); jkb=env->createBean2(env, pool,"threadMutex", NULL); if( jkb != NULL ) { worker_private->cs=jkb->object; jkb->init(env, jkb ); } worker_private->attempts = MAX_ATTEMPTS; worker_private->recovery = WAIT_BEFORE_RECOVER; worker_private->sticky_session = STICKY_SESSION; w->worker_private = worker_private; w->service = jk2_lb_service; for( i=0; i<JK_LB_LEVELS; i++ ) { w->workerCnt[i]=0; } jk2_map_default_create(env,&w->lbWorkerMap, pool); result->init = jk2_lb_init; result->destroy = jk2_lb_destroy; result->setAttribute=jk2_lb_setAttribute; result->multiValueInfo=jk2_worker_lb_multiValueInfo; result->setAttributeInfo=jk2_worker_lb_setAttributeInfo; result->object=w; w->mbean=result; w->hwBalanceErr=0; w->noWorkerCode=503; w->noWorkerMsg=NO_WORKER_MSG; w->workerEnv=env->getByName( env, "workerEnv" ); w->workerEnv->addWorker( env, w->workerEnv, w ); return JK_OK;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -