📄 server.cc~
字号:
/* $Id: server.cc,v 1.58 2005/08/02 19:45:32 jonathan Exp $ * Jonathan Ledlie, Harvard University. * Copyright 2005. All rights reserved. */#include <stdio.h>#include <stdlib.h>#include <iostream>#include "math_util.h"#include "error.h"#include "server.h"#include "lb.h"#include "object.h"double epsilonImprovement = 0.25;double kchoicesActiveMinCapacity = 10.;bool range=true;bool debugG = false;int lookupCount = 0;int duplicateHop = 0;int increasingFingerStat = 0;//为了系统稳定起见,经过400s 后才均衡bool warnedAboutEarliestRound = false;const int earliestLBround = 400;//经过400s 后才进行均衡处理const double thresholdDelta = 8.;const double logOfThresholdDelta = log(thresholdDelta);const double thresholdInitialConstant = 0.01;int thresholdSampleSize = 0;bool usingThreshold = false;int thresholdAttempt = 0;int neighborAdjustSuccess = 0;int reorderAttempt = 0;int reorderSuccess = 0;int transferMsgSendFail = 0;int transferNoCapacityFail = 0;int transferNoVsMatchFail = 0;int transferSuccess = 0;int transferAttempt = 0;int rechooseAttemptStat = 0;int rechooseChangeIdStat = 0;int rechooseCreateStat = 0;int rechooseDeleteStat = 0;int rechooseDidNotCreateDeleteStat = 0;int rechooseOutOfKeysStat = 0;int rechooseMaxVsSizeStat = 0;int rechooseWorseCostStat = 0;bool rechooseFlag = false;int max_best_keys_size = 0;int maxHops = 0; //记录路由经过的最大hopsset<double> allKeys; // 所有的使用的id 集合vector<double> statKeyListSize;bool shuttingDown;bool recordingStats;//需要记录状态set<double> deadVServers;set<double> gracefulRelocateVServers;//负载均衡中调整的虚拟节点结合map<double,VirtualServer*> vServers;//所有的虚拟节点集合map<double,VirtualServer*>tempdeadvServers;//暂时失效的虚拟节点double systemTargetCapacity;double currentTargetCapacity;int lbActionInterval = 30;int psUp = 0;//当前存在的物理节点数量int psCount = -1;//物理节点的数量int scoreCountStat = 0;int scorePredictedWorkStat = 0;int scoreBetterScoreStat = 0;int scoreBetterDistanceStat = 0;int routeCountStat = 0;//路由经过的hops 计数int routeSendMsgFailedStat = 0;int routeNextHopIsThisStat = 0;int routeNullFingerStat = 0;int routeSendMsgFailedDist[5];bool initializedRouteSendMsgFailedDist = false;//产生一个尚未使用的key (id)double getRandomUnusedKey () { bool goodId = false; double id; while (!goodId) { goodId = true; id = randPct(); if (allKeys.find(id) != allKeys.end()) goodId = false; } return id;}double keyDistance (double a, double b) { if (a <= b) { return b - a; } return b - a + 1.;}double distToSuccessor (VirtualServer *vs) { double id = vs->getKey(); map<double,VirtualServer*>::iterator p; p = vServers.lower_bound (id); ASSERT (p != vServers.end()); p++; double wrapped = 0.; if (p == vServers.end()) { p = vServers.begin(); wrapped = 1.; } double nextKey = p->second->getKey(); return nextKey - id + wrapped;}/* * findDst returns the VS responsible for a key (the clockwise VS). * Also used to find a node's successor返回负责该key 的虚拟节点也用来找一个节点的后继节点 */VirtualServer* findDst (double key) { map<double,VirtualServer*>::iterator p; p = vServers.upper_bound (key); if (p == vServers.end()) { p = vServers.begin(); } return p->second;}/* * findPred returns the VS of the predesessor of the node at this key */VirtualServer* findPred (double key) { map<double,VirtualServer*>::iterator p; p = vServers.lower_bound (key); if (p == vServers.end()) { p = vServers.begin(); } if (p == vServers.begin()) { p = vServers.end(); } p--; return p->second;}/* * mergeVServers is called exactly once after the initial node * births. This allows nodes to choose IDs at random for all of * their VSs and then have them merged in one fell swoop. * 对初始时刻创建的虚拟节点一次性合并 * */void mergeVServers () { map<double,VirtualServer*>::iterator succ, pred; pred = vServers.begin();//依次从vServer 取各个虚拟节点 bool reachedEnd = false; while (!reachedEnd && pred != vServers.end()) { succ = pred; succ++; if (succ == vServers.end()) { reachedEnd = true; succ = vServers.begin(); } PhysicalServer *psPred = pred->second->getRootPs(); PhysicalServer *psSucc = succ->second->getRootPs(); if (psPred == psSucc) { DEBUG_P (("PS %d merging %f into %f\n", psSucc->getId(), pred->first, succ->first)); int vsDelta = 0; psPred->deleteLocalVs (pred, vsDelta); } else { ; } pred = succ; }}/* * optKey and score are highly related. * * optKey looks at the gap between each pair of consecutive * VSs and finds the best key/score in that gap. It returns * the key with the best score. * * score takes a single potential key and the amount of work that * the PS would like the VS under formation to perform and returns * the score for this particular spot, based on the successor, * the size of the gap, etc. * * optCost computes the minimum cost given target work a, target work b, * current work w. It also returns upper and lower bounds for pct * of range that should be given to node a to achieve this cost. * */double PhysicalServer::findOptCost (double a, double s, double w, double &min_r, double &max_r) { double cost = -1; if (s <= 0 && a >= w) { printf ("case s<=0 and a>=w\n"); min_r = 1.; max_r = 1.; cost = a - s - w; } else if (a <= 0 && s >= w) { printf ("case s<=0 and a>=w\n"); min_r = 0.; max_r = 0.; cost = w - s - a; } else if (a + s < w) { printf ("case a + s < w\n"); cost = w - a - s; min_r = a/w; max_r = 1.-s/w; } else { // if (debug) printf ("case a + s >= w\n"); ASSERT (a+s>=w); cost = a + s - w; min_r = 1.-s/w; max_r = a/w; } cost -= fabs (s-w); printf ("a %f s %f w %f cost %f\n", a, s, w, cost); return cost;}double PhysicalServer::findOptKey (double desired_work, double &best_predicted_work, double &best_cost, double &best_gap) { map<double,VirtualServer*>::iterator succ, pred; best_cost = 0.; bool first_time_flag = true; double ourDesiredWork = getExtraCapacity(); set<OptKeyDesc*> best_keys; printf ("PS %d findOptKey\n", id); for (succ = vServers.begin(); succ != vServers.end(); succ++) { pred = succ; if (succ == vServers.begin()) pred = vServers.end(); pred--; ASSERT (pred->first >= 0. && pred->first < 1.); ASSERT (succ->second != NULL); printf ("pred %f succ %f succ-id %d succ-cap %f\n", pred->first, succ->first, succ->second->getRootPs()->getId(), succ->second->rootPs->getCapacity()); if (succ->second->getRootPs() == this) { DEBUG_P (("PS %d skipping ourselves in optKey\n", id)); continue; } double key_pred = pred->first; double key_succ = succ->first; double gap = keyDistance (key_pred,key_succ); double succDesiredWork = succ->second->getRootPs()->getExtraCapacity() + succ->second->getWorkPerRound(); double succCurrentWork = succ->second->getWorkPerRound(); double succCapacity = succ->second->rootPs->getCapacity(); if (succCurrentWork < 0.) { succCurrentWork = 0.; } //desired_work/ourCapacity double min_r, max_r; double cost = findOptCost (ourDesiredWork, succDesiredWork, succCurrentWork, min_r, max_r); printf ("cost %f min_r %f max_r %f\n", cost, min_r, max_r); if (first_time_flag || cost <= best_cost) { first_time_flag = false; ////////////////////////// // assign a key // give a little leeway if (max_r > .05) max_r = .95; if (min_r < .05) min_r = .05; double r = (max_r - min_r) / 2. + min_r; ASSERT (r > 0 && r < 1); double key = key_pred + r*gap; if (key >= 1.) key -= 1.; if (vServers.find(key) == vServers.end()) { double predicted_work = r * succCurrentWork; OptKeyDesc *desc = new OptKeyDesc (key, key_succ, key_pred, predicted_work, cost, gap); if (cost < best_cost) { printf ("clearing keys\n"); best_keys.clear (); } printf ("tentative key %f\n", key); best_keys.insert (desc); } } printf ("\n\n"); } if (best_keys.size() > max_best_keys_size) { max_best_keys_size = best_keys.size(); printf ("R %d max_best_keys_size %d\n", cRound, max_best_keys_size); } int index = unifRand(0,best_keys.size()); set<OptKeyDesc*>::iterator p = best_keys.begin(); for (int i = 0; i < index; i++) { p++; } ASSERT (p != best_keys.end()); OptKeyDesc *opt = *p; ASSERT (opt); double best_key = opt->key; best_predicted_work = opt->predicted_work; best_cost = opt->cost; best_gap = opt->gap; ASSERT (best_key >= 0. && best_key < 1.); printf ("using key %f\n", best_key); return best_key;}double PhysicalServer::findDepartureCost (double ourKey, double &gap) { map<double,VirtualServer*>::iterator ourIter, succIter, predIter; VirtualServer *ourVs, *succVs, *predVs; double ourDesiredWork; double succDesiredWork, succCurrentWork, succFutureWork; double cost; ourIter = vServers.find (ourKey); ASSERT (ourIter != vServers.end()); predIter = ourIter; if (ourIter == vServers.begin()) predIter = vServers.end(); predIter--; succIter = ourIter; succIter++; if (succIter == vServers.end()) succIter = vServers.begin(); ourVs = ourIter->second; predVs = predIter->second; succVs = succIter->second; ourDesiredWork = getExtraCapacity() + ourVs->getWorkPerRound(); succDesiredWork = succVs->getRootPs()->getExtraCapacity() + succVs->getWorkPerRound(); succCurrentWork = succVs->getWorkPerRound(); if (succCurrentWork < 0.) succCurrentWork = 0.; succFutureWork = ourVs->getWorkPerRound() + succVs->getWorkPerRound(); DEBUG_P (("R %d PS %d departureCost: us %f (work curr %f des %f) succ %f (work curr %f des %f fut %f)\n", cRound, id, ourVs->getKey(), ourVs->getWorkPerRound(), ourDesiredWork, succVs->getKey(), succCurrentWork, succDesiredWork, succFutureWork)); // normalized to capacity cost = fabs(succFutureWork-succDesiredWork)/succVs->rootPs->getCapacity() - fabs(ourVs->getWorkPerRound()-ourDesiredWork)/getCapacity() - fabs(succCurrentWork-succDesiredWork)/succVs->rootPs->getCapacity(); gap = keyDistance (predVs->getKey(), succVs->getKey()); return cost;}double PhysicalServer::findScore (double ourKey, double ourDesiredWork, double &ourFutureWork, double &gap) { map<double,VirtualServer*>::iterator succ, pred; double ourDomain, succNewDomain, succCurrentDomain; double predKey, succKey; double succDesiredWork, succCurrentWork, succFutureWork; double cost, pctDistance; succ = vServers.upper_bound (ourKey); pred = succ; if (succ == vServers.end()) { succ = vServers.begin(); } if (pred == vServers.begin()) { pred = vServers.end(); } pred--; predKey = pred->first; succKey = succ->first; succDesiredWork = succ->second->getRootPs()->getExtraCapacity() + succ->second->getWorkPerRound(); succCurrentWork = succ->second->getWorkPerRound(); if (succCurrentWork < 0.) succCurrentWork = 0.; ourDomain = keyDistance (predKey, ourKey); succCurrentDomain = keyDistance (predKey, succKey); succNewDomain = keyDistance (ourKey, succKey); gap = succCurrentDomain; pctDistance = ourDomain / succCurrentDomain; succFutureWork = (1.-pctDistance)*succCurrentWork; ourFutureWork = pctDistance*succCurrentWork; cost = fabs(succFutureWork-succDesiredWork)/succ->second->rootPs->getCapacity() + fabs(ourFutureWork-ourDesiredWork)/getCapacity() - fabs(succCurrentWork-succDesiredWork)/succ->second->rootPs->getCapacity();#ifdef DEBUG if (debug) { printf ("R %d PS %d psUp %d\n", cRound, id, psUp); printf ("predKey %f ourKey %f (pctDist %f) succKey %f\n", predKey, ourKey, pctDistance,succKey); printf ("succ work: actual: %f desired %f\n", succCurrentWork, succDesiredWork); printf ("our work: guess: %f desired %f\n", ourFutureWork, ourDesiredWork); printf ("succ work: guess: %f desired %f\n", succFutureWork, succDesiredWork); printf ("cost %f\n", cost); }#endif ASSERT (pctDistance <= 1. && pctDistance >= 0.); return cost;}PhysicalServer::PhysicalServer () { if (!initializedRouteSendMsgFailedDist) { routeSendMsgFailedDist[0] = 0; routeSendMsgFailedDist[1] = 0; routeSendMsgFailedDist[2] = 0;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -