📄 server.cc
字号:
} return;}VirtualServer* PhysicalServer::getRandomVs () { // pick random vs to try int index = unifRand(0,localVs.size()); set<VirtualServer*>::iterator p = localVs.begin(); for (int i = 0; i < index; i++) { p++; } ASSERT (p != localVs.end()); VirtualServer *vs = *p; ASSERT (vs); return vs;}// Ganesan's Threshold Algorithmint PhysicalServer::thresholdVs (int &vsDelta) { // hard-coded vsAct: // 0 = nothing - threshold failed // 1 = did neighbor adjust // 2 = did reorder thresholdShift = false; setNextLBactionTime (); ASSERT (localVs.size() == 1); set<VirtualServer*>::iterator ourLocalVsIter = localVs.begin(); VirtualServer *ourVs = *ourLocalVsIter; if (ourVs->rootPs->getUtil() < 0) return false; thresholdAttempt++; // reorder is called from within neighborAdjust return neighborAdjust (ourVs);}int PhysicalServer::neighborAdjust (VirtualServer *ourVs) { // first compare with neighbor map<double,VirtualServer*>::iterator ourIter, succIter, predIter, moveMeIter; bool moveUsTowardPred = false, movePredTowardUs = false; ourIter = vServers.find(ourVs->getKey()); ASSERT (ourIter != vServers.end()); predIter = ourIter; succIter = ourIter; succIter++; if (succIter == vServers.end()) succIter = vServers.begin(); if (predIter == vServers.begin()) predIter = vServers.end(); predIter--; VirtualServer *predVs = predIter->second; VirtualServer *succVs = succIter->second; int ourLevel = ourVs->rootPs->getThresholdLevel(); int succLevel = succVs->rootPs->getThresholdLevel(); int predLevel = predVs->rootPs->getThresholdLevel(); if (ourLevel > succLevel && ourLevel > predLevel) { if (succLevel > predLevel) { // succ has more load than pred movePredTowardUs = true; } else { moveUsTowardPred = true; } } else if (ourLevel > succLevel) { moveUsTowardPred = true; } else if (ourLevel > predLevel) { movePredTowardUs = true; } else { if (debugG) printf ("R %d PS %d neighb-adj fail-level p %f %d us %f %d s %f %d\n", cRound, id, predVs->getKey(), predLevel, ourVs->getKey(), ourLevel, succVs->getKey(), succLevel); reorderAttempt++; if (reorder (ourVs)) { return 2; } return 0; } double oldKey = -1.; double newKey = -1.; VirtualServer *moveMeVs = NULL; if (moveUsTowardPred) { moveMeIter = ourIter; moveMeVs = ourVs; oldKey = ourVs->getKey(); newKey = oldKey - keyDistance(predVs->getKey(),oldKey)*(1.-1./thresholdDelta); if (newKey < 0.) newKey += 1.; if (debugG) printf ("moveUsTowardPred: "); } else if (movePredTowardUs) { moveMeIter = predIter; moveMeVs = predVs; oldKey = predVs->getKey(); newKey = oldKey + keyDistance(oldKey,ourVs->getKey())*(1.-1./thresholdDelta); if (newKey > 1.) newKey -= 1.; if (debugG) printf ("movePredTowardUs: "); } else { ASSERT (0); } // OK, change the key if (vServers.find (newKey) != vServers.end()) { // abort if the keys are too tightly packed if (debugG) printf ("R %d PS %d neighb-adj fail-key p %1.14f %d us %1.14f %d (tried %1.14f, util %f) s %1.14f %d\n", cRound, id, predVs->getKey(), predLevel, ourVs->getKey(), ourLevel, newKey, ourVs->rootPs->getUtil(), succVs->getKey(), succLevel); return 0; } vServers.erase (moveMeIter); moveMeVs->setKey (newKey); vServers.insert(pair<double,VirtualServer*>(newKey,moveMeVs)); gracefulRelocateVServers.insert (oldKey); if (debugG) printf ("R %d PS %d neighb-adj from %1.14f to %1.14f, our key %1.14f, p %d level %d our-level %d s %d level %d\n", cRound, id, oldKey, newKey, ourVs->getKey(), predVs->rootPs->getId(), predLevel, ourLevel, succVs->rootPs->getId(), succLevel); neighborAdjustSuccess++; return 1;}bool PhysicalServer::reorder (VirtualServer *ourVs) { // else compare with some random nodes // The only way for this guy to get a random sample is to // pick numbers at random. // He can't pick physical servers uniformly at random. double bestUtil = 1.; double bestKey = -1.; double ourWpr = ourVs->getWorkPerRound(); VirtualServer *bestVs; map<double,VirtualServer*>::iterator bestIter; for (int i = 0; i < thresholdSampleSize; i++) { double sampleKey = randPct(); map<double,VirtualServer*>::iterator sampleIter, sampleSuccIter; sampleIter = vServers.upper_bound (sampleKey); if (sampleIter == vServers.end()) { sampleIter = vServers.begin(); } sampleSuccIter = sampleIter; sampleSuccIter++; if (sampleSuccIter == vServers.end()) { sampleSuccIter = vServers.begin(); } VirtualServer *sampleVs = sampleIter->second; VirtualServer *sampleSuccVs = sampleSuccIter->second; double sampleWpr = sampleVs->getWorkPerRound(); double sampleSuccWpr = sampleSuccVs->getWorkPerRound(); if (debugG) printf ("R %d PS %d reorder sample %f w %f l %d succ %f w %f l %d sampleCap %f succCap %f", cRound, id, sampleVs->getKey(), sampleWpr, sampleVs->rootPs->getThresholdLevel(), sampleSuccVs->getKey(), sampleSuccWpr, sampleSuccVs->rootPs->getThresholdLevel(), sampleVs->rootPs->getCapacity(), sampleSuccVs->rootPs->getCapacity()); // in absolute terms, will this overload the sample's successor? if (sampleWpr + sampleSuccWpr > sampleSuccVs->rootPs->getUpperTarget()) { if (debugG) printf (" BAD\n"); continue; } // in relative terms, pick the one with the lowest util double sampleUtil = sampleVs->rootPs->getUtil(); if (sampleUtil < bestUtil) { bestUtil = sampleUtil; bestKey = sampleKey; bestIter = sampleIter; bestVs = sampleVs; if (debugG) printf (" BEST\n"); } else { if (debugG) printf (" NOT BEST\n"); } } if (bestKey >= 0.) { double oldKey = bestKey; double ourPredKey = (findPred (ourVs->getKey()))->getKey(); double newKey = ourPredKey + keyDistance (ourPredKey,ourVs->getKey())/2.; if (newKey >= 1.) newKey -= 1.; if (vServers.find (newKey) != vServers.end()) { if (debugG) printf ("R %d PS %d reorder failed (packed keys) from %f to %f (PS %d)\n", cRound, id, oldKey, newKey, bestVs->rootPs->getId()); return false; } vServers.erase (bestIter); bestVs->setKey (newKey); vServers.insert(pair<double,VirtualServer*>(newKey,bestVs)); gracefulRelocateVServers.insert (oldKey); if (debugG) printf ("R %d PS %d reordered from %f to %f (PS %d)\n", cRound, id, oldKey, newKey, bestVs->rootPs->getId()); reorderSuccess++; return true; } return false;}/* * When k-choices is active, it calls this function to * reselect IDs and to create or delete VSs */int PhysicalServer::rechooseVs (int &vsDelta) { ASSERT (traceFP); setNextLBactionTime (); bool isOverload = false; if (util > upperSlack) { isOverload = true; if (capacity <= kchoicesActiveMinCapacity && util < 1) return 0; } else if (util < lowerSlack) { // fuggetabboutit if a low BW node is underloaded if (capacity <= kchoicesActiveMinCapacity) return 0; ; } else { // what are we doing here?? ASSERT (0); } rechooseAttempts++; rechooseAttemptStat++; //////////////////////////////////////////////////////////////////// // Create or Delete VSs if (rechooseAttempts > localVs.size()) { rechooseAttempts = 0; if (isOverload && localVs.size() > 1) { // these are being recorded in lb.cc rechooseDeleteStat++; if (recordingStats) { nodeStatRechooseDelete++; } haveDeletedVS = true; usedKeys.clear(); deleteVs (vsDelta); return 2; } else if (!isOverload && (!dampeningLimitCrDel || !haveDeletedVS)) { double targetWork = getExtraCapacity(); double predictedWork = 0.; rechooseCreateStat++; if (recordingStats) { nodeStatRechooseCreate++; } usedKeys.clear(); createVs (1,targetWork,predictedWork); return 1; } else { rechooseDidNotCreateDeleteStat++; return 0; } } //////////////////////////////////////////////////////////////////// // Move existing VSs to new IDs // bestDepCost depicts the change in mismatches // without us at that spot anymore. // If this is negative, the sum of mismatches has declined // we might have no where to move to at all if (localVs.size() == idChoice) { rechooseMaxVsSizeStat++; return 0; } set<VirtualServer*>::iterator bestVs = localVs.end(); double bestDepartureCost = -1; double departureGap; //遍历虚拟节点集合找最优的mismatch for (set<VirtualServer*>::iterator p = localVs.begin(); p != localVs.end(); p++) { VirtualServer *vs = *p; usedKeys.insert (vs->getKey()); double thisDepartureGap; double thisDepartureCost = findDepartureCost (vs->getKey(),thisDepartureGap); if (bestVs == localVs.end() || thisDepartureCost < bestDepartureCost) { bestVs = p; bestDepartureCost = thisDepartureCost; departureGap = thisDepartureGap; } } VirtualServer *vs = *bestVs; double predictedWork = 0.; double desiredWork = getExtraCapacity() + vs->getWorkPerRound(); DEBUG_P (("R %d PS %d rechooseVs util %f (work %f) localVs %d, key %f departureCost %f desiredWork %f\n", cRound, id, util, getWorkPerRound(), localVs.size(), vs->getKey(), bestDepartureCost, desiredWork)); double newLocationCost; vector<double> unusedKeys = getSampleKeys(); if (unusedKeys.size() == 0) { rechooseOutOfKeysStat++; return 0; } double newLocationGap = 1.; //选择新的id double key = chooseKey (unusedKeys, desiredWork, predictedWork, vs->getKey(), newLocationCost, newLocationGap); // bestDepartureCost = [future mismatch] - [current mismatch] // newLocationCost = [future mismatch] - [current mismatch] if (bestDepartureCost+newLocationCost > -1.* epsilonImprovement) { rechooseWorseCostStat++; return 0; } DEBUG_P (("R %d PS %d rechooseVs util %f cur %f new %f curr %f des %f pred %f\n", cRound, id, util, vs->getKey(), key, getWorkPerRound(), desiredWork, predictedWork)); // it's red five, we're going in // relocate in vServers map<double,VirtualServer*>::iterator currentLocIter, newLocIter; currentLocIter = vServers.find(vs->getKey()); newLocIter = vServers.find(key); ASSERT (currentLocIter != vServers.end()); if (newLocIter != vServers.end()) { printf ("R %d PS %d aborting because oracle picked a key in use\n", cRound, id); return 0; } vServers.erase (currentLocIter); // clear fingers and tell it what its new id is double oldKey = vs->getKey(); vs->setKey (key); vServers.insert(pair<double,VirtualServer*>(key,vs)); // notify others gracefulRelocateVServers.insert (oldKey); // don't need to do anything to localVs // not doing merging as chooseKey won't have returned us a key where // we would be successor rechooseChangeIdStat++; if (recordingStats) { nodeStatRechooseChangeId++; } if (idChoice == 0) allKeys.insert(key); return 3;}void PhysicalServer::printLocalVs () { for (set<VirtualServer*>::iterator p = localVs.begin(); p != localVs.end(); p++) { VirtualServer *vs = *p; fprintf (traceFP, " %f", vs->getKey()); } fprintf (traceFP, "\n");}bool PhysicalServer::canTakeLBaction () { if (usingThreshold && nextLBactionTime < cRound) { if (cRound == birthTime) { thresholdLevel = 0; } else { int currentThresholdLevel = util2thresholdLevel (util); if (util > 1. || currentThresholdLevel > thresholdLevel) { thresholdShift = true; } thresholdLevel = currentThresholdLevel; } ASSERT (thresholdLevel >= 0); return thresholdShift; } if (nextLBactionTime < cRound) return true; return false;}//随机产生下一次均衡调度的时间void PhysicalServer::setNextLBactionTime () { if (usingThreshold) nextLBactionTime = cRound + 60; nextLBactionTime = cRound + (int)(poisson ((double)lbActionInterval));}//本虚拟节点的负责空间之和double VirtualServer::getArcLength () {#if 0 map<double,VirtualServer*>::iterator succ = vServers.upper_bound (key); if (succ == vServers.end()) { succ = vServers.begin(); } double succKey = succ->first; double arc = keyDistance (key, succKey);#else double arc = keyDistance (findPred(key)->getKey(), key);#endif return arc;}//物理节点的所有虚拟节点的负责空间之和double PhysicalServer::getArcLength () { if (!alive) return 0.; set<VirtualServer*>::iterator p; double arc = 0.; for (p = localVs.begin(); p != localVs.end(); p++) { arc += (*p)->getArcLength(); } return arc;}VirtualServer::VirtualServer (double k, PhysicalServer *ps) { key = k; previousKey = -1.; birthTime = cRound; workCurrent = 0; workHistory = -1.; rootPs = ps;//所在的物理节点 fingers = new Fingers ();//分配fingers 空间 predCache = NULL;}/* * VS::merge merges the histories of two virtual servers into one.仅仅把以前的工作情况合并 */void VirtualServer::merge (VirtualServer *oldGuy) { workHistory += oldGuy->getWorkPerRound(); if (oldGuy->getBirth() < birthTime) birthTime = oldGuy->getBirth();}VirtualServer::~VirtualServer () { delete fingers;}void PhysicalServer::incrMaintMsgOut (int mCount) { nodeStatMaintOut += mCount;}void PhysicalServer::incrMaintMsgIn (int mCount) { nodeStatMaintIn += mCount;}void PhysicalServer::incrQueryMsgOut (int mCount) { nodeStatQueryOut += mCount;}void PhysicalServer::incrQueryMsgIn (int mCount) { nodeStatQueryIn += mCount;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -