📄 server.cc
字号:
* * The function returns false if the PS was the successor to * the VS being created and the creation was aborted. * * If the creation was successful, it returns true. * * predictedWork is only set if ID selection is turned on. */double PhysicalServer::chooseKey (vector<double> keyList, double desiredWork, double &predictedWork, double skipKey, double &score, double &gap) { double key = -1; ASSERT (idChoice != 1); ASSERT ((idChoice == 0) || (idChoice > 1 && keyList.size() > 0)); if (cRound > earliestLBround && idChoice == 0 && vServers.size() > 1) { // returns predicted work in desiredWork variable key = findOptKey (desiredWork, predictedWork, score, gap); } else if (idChoice > 1 && vServers.size() > 1) { if (cRound < earliestLBround) { return keyList[0]; } score = -1.; predictedWork = -1.; gap = 1.; DEBUG_P (("PS %d desiredWork %f\n", id, desiredWork)); //遍历未使用的keyid集合,找出最优的id---findscore for (int i = 0; i < keyList.size(); i++) { if (keyList[i] == skipKey) continue; double thisPredictedWork = 0.; double thisGap = 0.; double thisScore = findScore (keyList[i], desiredWork, thisPredictedWork, thisGap); if (predictedWork < 0. || thisScore < score) { // all other things being equal, go to the biggest gap scoreCountStat++; if (predictedWork < 0.) { scorePredictedWorkStat++; } else if (thisScore < score) { scoreBetterScoreStat++; } else if (thisScore == score && gap > thisGap) { scoreBetterDistanceStat++; } key = keyList[i]; score = thisScore; predictedWork = thisPredictedWork; gap = thisGap; ASSERT (gap > 0. && gap < 1.); ASSERT (predictedWork >= 0.); } } DEBUG_P (("PS %d choose key %f target %f future %f score %f sysCap %f\n", id, key, desiredWork, predictedWork, score, systemTargetCapacity)); ASSERT (predictedWork >= 0.); } else { key = getRandomKey (); } return key;}bool PhysicalServer::createVs (int vsCreateCount, double desiredWork, double &predictedWork) { ASSERT (alive); setNextLBactionTime ();//安排下次LB 的时间 double key; vector<double> keyList; if (idChoice != 1) {//k-choice 0, 2, 3,... if (desiredWork <= 0.) { // make sure we've created at least one VS for this guy ASSERT (localVs.size() > 0); return false; } if (idChoice > 1) { if (localVs.size() >= idChoice) { return false; } keyList = getSampleKeys(); } statKeyListSize.push_back ((double)(keyList.size())); double cost, gap; key = chooseKey (keyList, desiredWork, predictedWork, -1., cost, gap);//key的选择 } else {//idChoice == 1, 给虚拟节点分配一个未使用的id key = getRandomKey (); } if (key < 0. || key >= 1.) {//id合法性检查 printf ("key %f\n", key); ASSERT (key >= 0. && key < 1.); } // This key has already been checked for uniqueness allKeys.insert (key); bool createdOK = true;//初始时刻不需要检查虚拟节点的合并//而是等所有虚拟加入完毕后,进行一次性合并操作 if (cRound == 0) { // Do not merge VSs right at beginning DEBUG_P (("PS %d create Vs key %f\n", id, key)); addVs (key); } else {//其他时刻检查是否出现虚拟节点加入时的合并 map<double,VirtualServer*>::iterator succ, pred; succ = vServers.upper_bound (key); //例如:map中已经插入了1,2,3,4的话 //如果lower_bound(2) 的话,返回的2 //而upper_bound(2) 的话,返回的就是3 //begin() 取得指向容器第一个元素的迭代器 //end() 取得指向容器最后一个元素后面的迭代器 //不是最后一个 if (succ == vServers.end()) succ = vServers.begin(); if (succ->second->getRootPs() != this) { //若后继虚拟节点的物理节点和本物理节点 //不是同一物理,则创建该虚拟节点 // merge, if we own our predecessor //前驱的计算 pred = succ; if (pred == vServers.begin()) { pred = vServers.end();//注意end() 不是最后的元素 } pred--; DEBUG_P (("PS %d create Vs key %f\n", id, key)); VirtualServer* newVs = addVs (key); if (pred->second->getRootPs() == this) { //若和前驱虚拟节点的物理节点为同一物理节点 //则合并该节点,将前驱虚拟节点合并到新建 //的虚拟节点中 DEBUG_P (("PS %d merging %f into %f\n", id, pred->first,key)); newVs->merge (pred->second); int vsCount = 0; deleteLocalVs (pred, vsCount); createdOK = false; } } else { //若该物理节点和该key 的后继虚拟节点的物理节点 //为同一个物理节点,则不需要创建 DEBUG_P (("PS %d aborting creation of %f because we are successor\n", id, key)); createdOK = false; } } return createdOK;}/* * PS::splitVs. * Enter with exactly one VS; PS is currently overloaded. * Create a new VS at ID that is halfway between current VS * and the next VS (that is residing on some other node). * * If the IDs are two tightly packed and this new "halfway" ID * is already in the system, abort the split. This happens * very rarely. 只有在transfer 方案中调用 */bool PhysicalServer::splitVs () { setNextLBactionTime (); set<VirtualServer*>::iterator p = localVs.begin(); VirtualServer *vs = *p; ASSERT (localVs.size() == 1 && vs); double domain = distToSuccessor (vs); double key = vs->getKey() + domain / 2.; if (key > 1.) key = key - 1.; DEBUG_P (("PS %d split Vs newkey %f domain %f\n", id, key, domain)); // It can happen that there just isn't any room for a key here. // In this case, we just return false. if (vServers.find(key) != vServers.end()) { return false; } // check that this key isnt currently in use and remove it from // free list if it isnt in use // it is not possible that the key is in use, because then we would // have picked a different key. addVs (key); if (recordingStats) { nodeStatVsCount++; nodeStatSplit++; } return true;}/* * PS::addVs. Enter with an allocated, checked key. * Do the actual VS creation and adding to local data structures. */VirtualServer* PhysicalServer::addVs (double key) { VirtualServer *vs = new VirtualServer (key, this);//创建新的虚拟节点结构 ASSERT (vs); vServers.insert(pair<double,VirtualServer*>(key,vs));//加入所有的虚拟节点集合 localVs.insert(vs);//加入该物理节点的虚拟节点集合 return vs;}/* * PS::deleteLocalVs is a helper function to remove a VS in merge cases. */void PhysicalServer::deleteLocalVs (map<double,VirtualServer*>::iterator p, int &vsDelta) { ASSERT (alive); set<VirtualServer*>::iterator q = localVs.find (p->second); ASSERT (q != localVs.end()); removeVs (p, vsDelta); localVs.erase (q);}/* * PS::deleteVs chooses the best VS to delete and deletes it. * The best VS is the one most likely to push us below our * upper target threshold.删除最合适的虚拟节点最合适的虚拟节点的确定方法如下 */void PhysicalServer::deleteVs (int &vsDelta) { ASSERT (alive); setNextLBactionTime (); // we get here because we are currently overloaded, // not because the sum of the VSs histories say we are double amtToLose = (double)(upperTarget - getWorkPerRound()); set<VirtualServer*>::iterator bestVs = localVs.end(); set<VirtualServer*>::iterator maxVs = localVs.begin(); for (set<VirtualServer*>::iterator p = localVs.begin(); p != localVs.end(); p++) { VirtualServer *vs = *p; if (vs->getWorkPerRound() > amtToLose) { if (bestVs == localVs.end() || (*bestVs)->getWorkPerRound() > vs->getWorkPerRound()) { bestVs = p; } } else if ((*maxVs)->getWorkPerRound() < vs->getWorkPerRound()) { maxVs = p; } } if (bestVs == localVs.end()) { bestVs = maxVs; } removeVs (vServers.find ((*bestVs)->getKey()), vsDelta); localVs.erase (bestVs); // tell those who are pointing to us gracefulRelocateVServers.insert ((*bestVs)->getKey()); return;}/* * PS::removeLocalVs is used in merge cases where we need to tell * another PS that one of its VSs is being removed due to a merge. * 仅removeVS 调用 */void PhysicalServer::removeLocalVs (VirtualServer *vs) { localVs.erase (vs);}//删除虚拟节点//参数p 为所要删除虚拟节点开始的列表void PhysicalServer::removeVs (map<double,VirtualServer*>::iterator p, int &vsDelta) { vsDelta--; // VirtualServer *vs, *predVs; map<double,VirtualServer*>::iterator succ, pred; DEBUG_P (("PS %d remove Vs %f\n", id, p->first));//p->first: vs_key vs = p->second;//p->second: virtual server double key = vs->getKey(); deadVServers.insert (key);//将该删除的虚拟节点加入所有死掉的集合//若将该虚拟节点删除,可能导致该虚拟节点相邻的//虚拟节点属于同一个物理节点,从而可能需要进行//合并操作 succ = p; succ++;//p 列表的下一个虚拟节点 if (succ == vServers.end()) succ = vServers.begin(); vServers.erase (p);//删除该虚拟节点 tempdeadvServers.insert(pair<double,VirtualServer*>(p->first,p->second)); /* 暂时不删除 delete vs; vs = NULL;*/ pred = succ; if (pred == vServers.begin()) pred = vServers.end(); pred--; if (!shuttingDown && succ->second->getRootPs() == pred->second->getRootPs()) { //若发现前后属于同一物理节点的虚拟节点 //则合并,即将前驱虚拟节点删除 predVs = pred->second; DEBUG_P (("PS %d removeVs, PS %d doing a merge: removing %f, merging %f and %f\n", id, predVs->getRootPs()->getId(), key, pred->second->getKey(), succ->second->getKey())); // ASSERT (succ->second->getRootPs()->getId() != id); // set<VirtualServer*>::iterator q = lVs.find (pred->second); // ASSERT (q != lVs.end()); deadVServers.insert (predVs->getKey()); vServers.erase (predVs->getKey()); predVs->getRootPs()->removeLocalVs (predVs); delete predVs; predVs = NULL; vsDelta--; } return;}bool PhysicalServer::sendMaintMsg (double dstKey) { VirtualServer *ourSrc = getRandomVs(); vector<double> hops; vector<int> fingersUsed; return ourSrc->route (dstKey, hops, fingersUsed, false);}/* * transfer scheme * PS::transferVs picks a node at random and negociates which * VS to tranfer to it, if one exists */bool PhysicalServer::transferVs (int &vsDelta) { ASSERT (alive); if (recordingStats) nodeStatTransferAttempt++; transferAttempt++;//尝试迁移负载 setNextLBactionTime (); PhysicalServer *dstPs = this; VirtualServer *dstVs = NULL; // don't transfer to ourselves double dstKey = -1.; while ((dstPs == this) && (dstKey = randPct())) { dstVs = findDst (dstKey); dstPs = dstVs->getRootPs(); } if (!sendMaintMsg (dstKey)) { if (debug) printf ("R %d PS %d failed to contact random node %f at PS %d to transfer to\n", cRound, id, dstKey, dstPs->getId()); transferMsgSendFail++; return false; } double extraCapacity = dstPs->getExtraCapacity(); DEBUG_P (("PS %d (work %f, l %d u %d) transfer (randKey %f) to PS %d, extra capacity %f\n", id, getWorkPerRound(), lowerTarget, upperTarget, dstVs->getKey(), dstPs->getId(), extraCapacity)); if (extraCapacity <= 0.) { transferNoCapacityFail++; return false; } set<VirtualServer*>::iterator lightestPtr, heaviestPtr, xferPtr; lightestPtr = localVs.end(); heaviestPtr = localVs.end() ; double lightestVsWork = extraCapacity; double heaviestVsWork = 0.; set<VirtualServer*>::iterator p; for (p = localVs.begin(); p != localVs.end(); p++) { VirtualServer *vs = *p; if (vs->getWorkPerRound() < extraCapacity) { if (getWorkPerRound() - vs->getWorkPerRound() < (double)upperTarget) { if (vs->getWorkPerRound() < lightestVsWork) { lightestPtr = p; } } else { if (heaviestPtr == localVs.end() || heaviestVsWork < vs->getWorkPerRound()) { heaviestPtr = p; heaviestVsWork = vs->getWorkPerRound(); } } } } if (lightestPtr != localVs.end()) { xferPtr = lightestPtr; } else if (heaviestPtr != localVs.end()) { xferPtr = heaviestPtr; } else { transferNoVsMatchFail++; return false; } // we are moving the VS along with all of its state // along with the VS (*xferPtr)->setRootPs (dstPs); dstPs->takeVs (*xferPtr, vsDelta); localVs.erase (xferPtr); // incoming pointers need to be notified of new host // gracefulRelocateVServers.insert ((*xferPtr)->getKey()); if (recordingStats) nodeStatTransferSuccess++; transferSuccess++; return true;}/* * PS::takeVs does the work of transferring a VS from one node * to another, including merging if necessary. */void PhysicalServer::takeVs (VirtualServer *vs, int &vsDelta) { map<double,VirtualServer*>::iterator us, succ, pred; ASSERT (vs); us = vServers.find (vs->getKey()); ASSERT (us != vServers.end()); pred = us; if (us == vServers.begin()) { pred = vServers.end(); } pred--; if (pred->second->getRootPs() == this) { vs->merge (pred->second); deleteLocalVs (pred, vsDelta); } succ = us; succ++; if (succ == vServers.end()) { succ = vServers.begin(); } if (succ->second->getRootPs() == this) { succ->second->merge (vs); removeVs (us, vsDelta); } else { localVs.insert (vs);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -