📄 server.cc
字号:
routeSendMsgFailedDist[3] = 0; routeSendMsgFailedDist[4] = 0; initializedRouteSendMsgFailedDist = true; } if (idChoice > 1) { for (int i = 0; i < idChoice; i++) { double key = getRandomUnusedKey (); allKeys.insert (key); keys.push_back (key); } } id = 0; capacity = 0.; upperTarget = 0.; lowerTarget = 0.; alive = false; birthTime = 0; deathTime = 0; work = 0; rechooseAttempts = 0; thresholdShift = false; thresholdLevel = 0; util = 0; maxUtil = 0; nextLBactionTime = 0; nodeStatUptime = 0; nodeStatRoundsOverloaded = 0; nodeStatTotalWork = 0; nodeStatVsAct = 0; nodeStatArc = 0; nodeStatVsCount = 0; nodeStatCreate = 0; nodeStatDelete = 0; nodeStatRechooseCreate = 0; nodeStatRechooseDelete = 0; nodeStatRechooseChangeId = 0; nodeStatSplit = 0; nodeStatTransferAttempt = 0; nodeStatTransferSuccess = 0; nodeStatQueryIn = 0; nodeStatQueryOut = 0; nodeStatMaintIn = 0; nodeStatMaintOut = 0;}PhysicalServer::~PhysicalServer () { deleteLocalVServers (false); deleteLocalVServers (true);}/* * PS:setCapacity is called at the beginning of each trial * to reset the PS to a known initial state. */void PhysicalServer::setNode (int myId, double c) { id = myId; alive = false; birthTime = -1; work = 0; util = 0.; maxUtil = 0.; localVs.clear(); usedKeys.clear (); haveDeletedVS = false; nodeStatUptime = 0; nodeStatRoundsOverloaded = 0; nodeStatTotalWork = 0; nodeStatVsAct = 0; nodeStatArc = 0.; nodeStatVsCount = 0; nodeStatCreate = 0; nodeStatDelete = 0; nodeStatRechooseCreate = 0; nodeStatRechooseDelete = 0; nodeStatRechooseChangeId = 0; nodeStatSplit = 0; nodeStatTransferAttempt = 0; nodeStatTransferSuccess = 0; nodeStatQueryIn = 0; nodeStatQueryOut = 0; nodeStatMaintIn = 0; nodeStatMaintOut = 0; if (c <= 0) c = 1; this->capacity = c; upperTarget = upperSlack*capacity; lowerTarget = lowerSlack*capacity; if (upperTarget <= .5) { upperTarget = .5; } if (lowerTarget <= 0.) { lowerTarget = 0.; } if (capacity <= 1.) { capacityIndex = 0; } else if (capacity <= 10.) { capacityIndex = 1; } else if (capacity <= 100.) { capacityIndex = 2; } else if (capacity <= 1000.) { capacityIndex = 3; } else { capacityIndex = 4; } }int util2thresholdLevel (double util) { if (util <= thresholdInitialConstant) return 0; return (int)(floor(log (util/thresholdInitialConstant)/logOfThresholdDelta));}int PhysicalServer::endOfRound () { int workThisRound = work; //本轮结束后记录相关情况 if (alive) { ASSERT (localVs.size() > 0); util = getWorkPerRound() / capacity;//本轮节点的利用率 if (recordingStats) { nodeStatUptime++; if (util > maxUtil) {//目前为止最大的负载利用率 maxUtil = util; } if (util > 1.) {//超级过载,我认为应该if(isOverloaded()) nodeStatRoundsOverloaded++; //过载记录 } nodeStatTotalWork += work; //总负载 // can be time consuming // nodeStatArc += getArcLength(); nodeStatVsCount += getVsCount (); } } //为下一轮初始记录 work = 0;//物理节点负载清0 for (set<VirtualServer*>::iterator p = localVs.begin(); p != localVs.end(); p++) {//各个虚拟节点负载清0 (*p)->endOfRound(); } return workThisRound;}//每轮后计算该虚拟节点的负载void VirtualServer::endOfRound () { if (workHistory < 0) {//set work history after the first round workHistory = (double)workCurrent; DEBUG_P (("R %d PS %d wH neg\n", cRound, rootPs->getId())); } else {// 本轮负载和历时负载的比例1:9 workHistory = EWMA_LAMBDA * workCurrent + (1.-EWMA_LAMBDA) * workHistory; } DEBUG_P (("R %d PS %d VS %1.10f wH %4.4f wC %5d\n", cRound, rootPs->getId(), key, workHistory, workCurrent)); //下一轮的工作负载初始化 workCurrent = 0;}void printUtilHistogram (FILE *fp, PhysicalServer *ps) { int b01 = 0, b1 = 0, b10 = 0, b100 = 0, b1000 = 0, bg1000 = 0; for (int i = 0; i < psCount; i++) { if (ps[i].isAlive()) { if (ps[i].getUtil() < 0.1) b01++; else if (ps[i].getUtil() < 1.) b1++; else if (ps[i].getUtil() < 10) b10++; else if (ps[i].getUtil() < 100) b100++; else if (ps[i].getUtil() < 1000) b1000++; else bg1000++; } } fprintf (fp,"R %d Util: %d [0.1] %d [1] %d [10] %d [100] %d [1000] %d\n", cRound, b01, b1, b10, b100, b1000, bg1000);}/* * Returns true if our work for this round was above our * target upper bound. */bool PhysicalServer::isOverloaded () { if (getUtil() > upperSlack) { DEBUG_P (("R %d PS %d overloaded\n", cRound, id)); return true; } return false;}/* * Returns true if our work for this round was below our * target lower bound. */bool PhysicalServer::isUnderloaded () { if (getUtil() < lowerSlack) return true; return false;}/* * Returns the number of virtual servers this PS current has. */int PhysicalServer::getVsCount () { return localVs.size();}int PhysicalServer::getDeathtime(){ return deathTime;}/* * Birth of the PS. Initializes some variables and creates * an initial number of VSs. * 物理节点加入处理 * 参数为初始虚拟节点数量 */int PhysicalServer::birth (int initialVsPerNode) { int createCount = 0; //printf("count :%d\n",getId()); ASSERT (!alive); birthTime = cRound; deathTime = -1;//死掉时间未定 util = 0.; usedKeys.clear (); haveDeletedVS = false; rechooseAttempts = 0; thresholdLevel = 0; thresholdShift = false; setNextLBactionTime (); alive = true; currentTargetCapacity += (double)(upperTarget); systemTargetCapacity += capacity; DEBUG_P (("R %d PS %d starting birth\n", cRound, id)); if (!oracle) { double VsDesiredWork = 0.; double VsPredictedWork = 0.; if (idChoice == 1) {//普通的虚拟节点创建策略 //直接创建初始数量的虚拟节点 // 注意: cround=0 不合并虚拟节点 for (int i = 0; i < initialVsPerNode; i++) { if (createVs (initialVsPerNode, VsDesiredWork, VsPredictedWork)) { createCount++; } } } else {//k-choices 中虚拟节点id 选择大小 int maxToCreate = 3; if (maxToCreate > idChoice) maxToCreate = idChoice / 2; if (cRound < earliestLBround) maxToCreate = 1; bool createOK = true; VsDesiredWork = (upperTarget - lowerTarget) / 2. + lowerTarget; ASSERT (VsDesiredWork > 0.); for (int i = 0; createCount == 0 || (createOK && VsDesiredWork > 0 && maxToCreate > i); i++) { VsPredictedWork = 0.; createOK = createVs (initialVsPerNode, VsDesiredWork, VsPredictedWork); if (createOK) { createCount++; VsDesiredWork -= VsPredictedWork; } } } } psUp++;//存在的物理节点数增加 DEBUG_P (("R %d PS %d finished birth arc %f psUp %d cap %f (l %f u %f) lvsSize %d sysCap %f\n", cRound, id, getArcLength(), psUp, capacity, lowerTarget, upperTarget, localVs.size(), currentTargetCapacity)); if (recordingStats) { nodeStatVsCount += createCount; } // createCount can be 0 if in oracle mode return createCount;}int PhysicalServer::rebirth () { alive=true; for (set<VirtualServer*>::iterator p = localVs.begin();p != localVs.end(); p++) { VirtualServer *vs = *p; //DEBUG_P (("PS %d removing key %f\n", id, vs->getKey())); map<double,VirtualServer*>::iterator q = tempdeadvServers.find ((*p)->getKey()); tempdeadvServers.erase(q); vServers.insert(pair<double,VirtualServer*>(q->first,q->second)); } return getVsCount();}int PhysicalServer::foreverdeath () { return deleteLocalVServers (true); }/* * Death of the PS. All of its VSs are removed from the system. * 返回删除该物理节点所有的虚拟节点数量 */int PhysicalServer::death () { int session = cRound - birthTime; //该节点存在时长 DEBUG_P (("PS %d death now %d psUp %d vsSize %d\n", id, cRound, psUp, localVs.size())); if (!alive) {//若已经死掉 printf ("PS %d death now %d psUp %d vsSize %d\n", id, cRound, psUp, localVs.size()); ASSERT (alive); } //死掉后的状态处理 alive = false;//死掉标记 deathTime = cRound;//当前轮该节点死掉 psUp--;//存在的物理节点减少 work = 0; currentTargetCapacity -= (double)(upperTarget); systemTargetCapacity -= capacity; // two sanity checks if (psUp <= 1) { //至少存在两个物理节点 printf ("round is %d\n", cRound); } ASSERT (psUp > 1); ASSERT (currentTargetCapacity > 0.); //删除该物理节点的所有的虚拟节点 int removedCount = deleteLocalVServers (false); ASSERT (vServers.size () > 0); if (recordingStats) { nodeStatVsCount += removedCount; } return removedCount;} //物理节点所有的虚拟节点退出int PhysicalServer::deleteLocalVServers (bool forever) { int vServerRemoved = localVs.size(); set<VirtualServer*> localVsCopy (localVs);//拷贝虚拟节点集合 //对每个虚拟节点分别移除if(!forever)//先从vServers中移出,插入到deadvServers;{ for (set<VirtualServer*>::iterator p = localVs.begin(); p != localVs.end(); p++) { VirtualServer *vs = *p; DEBUG_P (("PS %d removing key %f\n", id, vs->getKey())); map<double,VirtualServer*>::iterator q = vServers.find ((*p)->getKey()); //q 为所要删除虚拟节点开始的列表叠代 int vsDelta = 0; // removeVs (q, vsDelta);//删除该虚拟节点的处理过程 } //localVs.clear (); }else{ for (set<VirtualServer*>::iterator p = localVs.begin(); p != localVs.end(); p++) { VirtualServer *vs = *p; vs->mBlock.clear(); //DEBUG_P (("PS %d removing key %f\n", id, vs->getKey())); map<double,VirtualServer*>::iterator q = tempdeadvServers.find ((*p)->getKey()); tempdeadvServers.erase(q); } localVs.clear ();} return vServerRemoved;}/* * getRandomKey () * * Returns an unused key from this PS's collection */double PhysicalServer::getRandomKey () { if (idChoice <= 1) {//0, 1 double rid = getRandomUnusedKey (); allKeys.insert (rid); return rid; } else { vector<double> unusedKeys = getSampleKeys(); ASSERT (unusedKeys.size() > 0); int randIndex = unifRand (0,unusedKeys.size()); return unusedKeys[randIndex]; }}/* * getSampleKeys () * * Returns all unused keys from this PS's collection */vector<double> PhysicalServer::getSampleKeys () { vector<double> unusedKeys; if (rechooseFlag) printf ("sample keys"); for (int i = 0; i < keys.size(); i++) { if (rechooseFlag) printf (" %f (%d)", keys[i], i); bool valid = true; for (set<VirtualServer*>::iterator p = localVs.begin(); valid && p != localVs.end(); p++) { if ((*p)->getKey() == keys[i]) { valid = false; if (rechooseFlag) printf ("(in use)"); } } if (dampeningLimitIds) { for (set<double>::iterator p = usedKeys.begin(); valid && p != usedKeys.end(); p++) { if (*p == keys[i]) { if (rechooseFlag) printf ("(prior)"); valid = false; } } } if (valid) { unusedKeys.push_back (keys[i]); } } if (rechooseFlag) printf ("\n"); return unusedKeys;}/* PS::createVs creates one VS and adds it to * this PS. * * vsCreateCount is for when more than one Vs is being * created at a time (e.g., at birth), and we want each * Vs to get one nth of the node's target capacity. * * The new VSs key is either chosen optimally (idChoice == 0), * with the cost function "score", or at random.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -