📄 server.cc
字号:
void PhysicalServer::print (FILE *fp) { double avgUtil = 0.; double avgArc = 0.; double avgVsCount = 0.; if (nodeStatUptime > 0) { avgUtil = (nodeStatTotalWork/(double)nodeStatUptime)/capacity; avgArc = nodeStatArc/nodeStatUptime; avgVsCount = nodeStatVsCount/(double)nodeStatUptime; } double avgTransferSuccess = 0.; if (nodeStatTransferAttempt > 0) { avgTransferSuccess = nodeStatTransferSuccess/(double)avgTransferSuccess; } set<VirtualServer*>::iterator p = localVs.begin(); double key = -1.; if (p != localVs.end()) { VirtualServer *vs = *p; key = vs->getKey(); } fprintf (fp, "%-4d cap %5d w %5d up %5d util %8f maxUtil %8f vs %4.1f arc %8f cr %4d del %4d kCr %4d kDel %4d kM %4d spl %4d trA %4d trS %3.2f inQ %5d outQ %5d inM %5d outM %5d key %f\n", id, (int)capacity, nodeStatTotalWork, nodeStatUptime, avgUtil, maxUtil, avgVsCount, avgArc, nodeStatCreate, nodeStatDelete, nodeStatRechooseCreate, nodeStatRechooseDelete, nodeStatRechooseChangeId, nodeStatSplit, nodeStatTransferAttempt, avgTransferSuccess, nodeStatQueryIn, nodeStatQueryOut, nodeStatQueryIn, nodeStatMaintOut, key);}/* * PS::getExtraCapacity returns the difference between * the avg target threshold and the historical average work * of this node's VSs. */double PhysicalServer::getExtraCapacity () { return (double)(upperTarget+lowerTarget)/2. - getWorkPerRound();}/* * PS::canAddVs returns true if this node believes it can fit * one more VS of average work beneath its upper target threshold. */bool PhysicalServer::canAddVs (int maxVsPerNode) { double avgWork = getWorkPerRound(); if (localVs.size() >= maxVsPerNode) { return false; } if ((double)(upperTarget) > avgWork + avgWork/(double)localVs.size()) { return true; } return false;}/* * PS::getWorkPerRound finds the historical workload of the combined * VSs on this node. 所有虚拟节点的负载之和 */double PhysicalServer::getWorkPerRound () { set<VirtualServer*>::iterator p; double avgWork = 0.; for (p = localVs.begin(); p != localVs.end(); p++) { double vsWork = (*p)->getWorkPerRound(); if (vsWork > 0.) avgWork += vsWork; } return avgWork;}//虚拟节点的工作double VirtualServer::getWorkPerRound () { return workHistory;}/* * Returns the percentage of time overloaded over time alive. * Note that roundsUp and roundsOverloaded are only incremented * when recordingStats is true, which happens the second half of * each trial. *//* double PhysicalServer::getPctRoundsOverloaded () { if (nodeStatUptime == 0) return 0.; double pct = (double)roundsOverloaded / (double)roundsUp; return pct; }*//* * PS::getWorkToCapacityRatio returns the historical workload * on this node. */double PhysicalServer::getUtil () { return util;}double PhysicalServer::getMaxUtil () { return maxUtil;}//void VirtualServer::putBlock(double block){ mBlock.insert(block);}bool VirtualServer::route (double dstKey, vector<double> &hops, vector<int> &fingersUsed, bool isQuery) { lookupCount++; return route (dstKey, hops, fingersUsed, isQuery, -1);}bool VirtualServer::route (double dstKey, vector<double> &hops, vector<int> &fingersUsed, bool isQuery, double dstVsKeyCache) { if (hops.size() > maxHops) {//记录路由所经过的最多hops maxHops = hops.size(); } //缓存路由目标的虚拟节点 if (dstVsKeyCache < 0.) { VirtualServer *dstVs = findDst (dstKey); dstVsKeyCache = dstVs->getKey(); } DEBUG_P (("PS %d VS %f query dstkey %f\n", rootPs->getId(), key, dstKey)); //路由成功结束//key == dstVsKeyCache if (key == dstVsKeyCache) { DEBUG_P (("PS %d VS %f is dst for query of key %f\n", rootPs->getId(), key, dstKey)); return true; } DEBUG_P (("PS %d VS %f is not dst vs for key %f, %f is\n", rootPs->getId(), key, dstKey, dstVsKeyCache)); routeCountStat++; //查询本身的fingers ,找最相近的finger int fingerIndex = -1; double fingerKey = fingers->findFinger(key,dstKey,fingerIndex,dstVsKeyCache); map<double,VirtualServer*>::iterator fingerIter = vServers.find (fingerKey); VirtualServer *nextHop = NULL; if (fingerIter != vServers.end()) { nextHop = fingerIter->second; } if (!nextHop) {//找不到最相近的finger,没找到结束 DEBUG_P (("PS %d VS %f query finger pointed to dead node\n", rootPs->getId(), key)); routeNullFingerStat++; } if (nextHop == this) {//恰好自身,没找到结束 // can happen when network is really small routeNextHopIsThisStat++; printf ("Unusual event: PS %d VS %f not forwarding to ourselves\n", rootPs->getId(), key); return false; } if (nextHop && nextHop != this) {//下一hop 节点存在且不是自身 if (nextHop->sendMsg (this,isQuery)) { fingers->heartbeat (fingerIndex); DEBUG_P (("PS %d VS %f using finger %d to %f hop %d for query of %f\n", rootPs->getId(), key, fingerIndex, nextHop->getKey(), hops.size(), dstKey)); bool revisit = false; int minFinger = 100; for (int i = 0; i < hops.size(); i++) { if (hops[i] == key) { revisit = true; } if (fingersUsed[i] < minFinger) { minFinger = fingersUsed[i]; } } if (fingerIndex > minFinger) { increasingFingerStat++; return false; } if (revisit) { printf ("R %d dst %f\n", cRound, dstKey); for (int i = 0; i < hops.size(); i++) { printf ("%d hop %f finger %d\n", i, hops[i], fingersUsed[i]); } printf ("current %f", key); printf ("next hop %f", nextHop->getKey()); duplicateHop++; return false; } hops.push_back(key);//记录经过该虚拟节点 fingersUsed.push_back (fingerIndex);//记录使用的finger 的序号 return nextHop->route (dstKey, hops, fingersUsed, isQuery, dstVsKeyCache); } else { routeSendMsgFailedStat++; routeSendMsgFailedDist[nextHop->getRootPs()->getCapacityIndex()]++; } } DEBUG_P (("PS %d VS %f query key %f failed\n", rootPs->getId(), key, dstKey)); return false;}bool VirtualServer::proute (double dstKey, vector<double> &hops, vector<int> &fingersUsed, bool isQuery) { lookupCount++; return proute (dstKey, hops, fingersUsed, isQuery, -1);}bool VirtualServer::proute (double dstKey, vector<double> &hops, vector<int> &fingersUsed, bool isQuery, double dstVsKeyCache) { if (hops.size() > maxHops) {//记录路由所经过的最多hops maxHops = hops.size(); } //缓存路由目标的虚拟节点 double start,end; getRange(dstKey, start, end); VirtualServer *dstVs; if (dstVsKeyCache < 0.) { if(!range) dstVs = findDst (dstKey); else dstVs = findDst (end); dstVs = findDst (dstKey); dstVsKeyCache = dstVs->getKey(); } DEBUG_P (("PS %d VS %f query dstkey %f\n", rootPs->getId(), key, dstKey)); //路由成功结束//key == dstVsKeyCache if(!range&&(key == dstVsKeyCache)&&mBlock.count(dstKey)){ DEBUG_P (("PS %d VS %f is dst for query of key %f\n", rootPs->getId(), key, dstKey)); return true; } //double start,end; //getRange(dstKey, start, end); if (range&&between(key, start, end)&&mBlock.count(dstKey)) { DEBUG_P (("PS %d VS %f is dst for query of key %f\n", rootPs->getId(), key, dstKey)); return true; } DEBUG_P (("PS %d VS %f is not dst vs for key %f, %f is\n", rootPs->getId(), key, dstKey, dstVsKeyCache)); routeCountStat++; //查询本身的fingers ,找最相近的finger int fingerIndex = -1; double fingerKey = fingers->findFinger(key,dstKey,fingerIndex,dstVsKeyCache); map<double,VirtualServer*>::iterator fingerIter = vServers.find (fingerKey); VirtualServer *nextHop = NULL; if (fingerIter != vServers.end()) { nextHop = fingerIter->second; } if (!nextHop) {//找不到最相近的finger,没找到结束 DEBUG_P (("PS %d VS %f query finger pointed to dead node\n", rootPs->getId(), key)); routeNullFingerStat++; } if (nextHop == this) {//恰好自身,没找到结束 // can happen when network is really small routeNextHopIsThisStat++; printf ("Unusual event: PS %d VS %f not forwarding to ourselves\n", rootPs->getId(), key); return false; } if (nextHop && nextHop != this) {//下一hop 节点存在且不是自身 if (nextHop->sendMsg (this,isQuery)) { fingers->heartbeat (fingerIndex); DEBUG_P (("PS %d VS %f using finger %d to %f hop %d for query of %f\n", rootPs->getId(), key, fingerIndex, nextHop->getKey(), hops.size(), dstKey)); bool revisit = false; int minFinger = 100; for (int i = 0; i < hops.size(); i++) { if (hops[i] == key) { revisit = true; } if (fingersUsed[i] < minFinger) { minFinger = fingersUsed[i]; } } if (fingerIndex > minFinger) { increasingFingerStat++; return false; } if (revisit) { printf ("R %d dst %f\n", cRound, dstKey); for (int i = 0; i < hops.size(); i++) { printf ("%d hop %f finger %d\n", i, hops[i], fingersUsed[i]); } printf ("current %f", key); printf ("next hop %f", nextHop->getKey()); duplicateHop++; return false; } hops.push_back(key);//记录经过该虚拟节点 fingersUsed.push_back (fingerIndex);//记录使用的finger 的序号 return nextHop->route (dstKey, hops, fingersUsed, isQuery, dstVsKeyCache); } else { routeSendMsgFailedStat++; routeSendMsgFailedDist[nextHop->getRootPs()->getCapacityIndex()]++; } } DEBUG_P (("PS %d VS %f query key %f failed\n", rootPs->getId(), key, dstKey)); return false;}//利用叠代的方法进行查找过程//从源到目标之间经过的hop 数//和经过的fingersbool VirtualServer::findHopDistance (double dstKey, vector<double> &hops, vector<int> &fingersUsed, double dstVsKeyCache) { //一开始时,缓存该dstKey 的虚拟节点的key if (dstVsKeyCache < 0.) {//负责dstKey的虚拟节点 VirtualServer *dstVs = findDst (dstKey); dstVsKeyCache = dstVs->getKey(); } //到达目标虚拟节点 if (key == dstVsKeyCache) {//若本节点恰好为要找的虚拟节点,则结束查询 return true;//找到目标 } //从该节点的fingers 中查找最近的节点 int fingerIndex = -1; double fingerKey = fingers->findFinger(key,dstKey,fingerIndex,dstVsKeyCache); map<double,VirtualServer*>::iterator fingerIter = vServers.find (fingerKey); VirtualServer *nextHop = NULL; if (fingerIter != vServers.end()) {//找到 nextHop = fingerIter->second; } if (nextHop == this) {//回到自身,找不到 // can happen when network is really small return false; } if (nextHop && nextHop != this) {//进一步查找 hops.push_back(key);// fingersUsed.push_back (fingerIndex); return nextHop->findHopDistance (dstKey, hops, fingersUsed, dstVsKeyCache); } return false;}bool VirtualServer::sendMsg (VirtualServer *sender, bool isQuery) { workCurrent++; return rootPs->sendMsg (sender->getRootPs()->getId(), isQuery);}bool VirtualServer::sendMsg (int senderId, bool isQuery) { workCurrent++; return rootPs->sendMsg (senderId, isQuery);}bool PhysicalServer::sendMsg (int senderId, bool isQuery) { if (!alive) { ASSERT (alive); } ASSERT (senderId >= 0 && senderId < psCount); if (recordingStats) { if (isQuery) queryMsg[senderId][id]++;//查询消息 else maintMsg[senderId][id]++;//维护消息 } if (work >= capacity) {//若该物理节点过载,则不发送消息 DEBUG_P (("R %d PS %d got a packet but is overloaded work %d\n", cRound, id, work)); work++; return false; } DEBUG_P (("R %d PS %d sendMsg work %d\n", cRound, id, work)); work++; return true;}//alive statutsbool PhysicalServer::isAlive () { return alive;}//void VirtualServer::setKey (double k) { key = k; workHistory = -1.;}OptKeyDesc::OptKeyDesc(double k, double s, double p, double p_work, double c, double g) { key = k; succ = s; pred = p; predicted_work = p_work; cost = c; gap = g;}OptKeyDesc::~OptKeyDesc () {}double findPctRingUnderCapacity (PhysicalServer *ps) { double psArcs[psCount]; for (int i = 0; i < psCount; i++) { psArcs[i] = 0.; } for (map<double,VirtualServer*>::iterator walk = vServers.begin(); walk != vServers.end(); walk++) { map<double,VirtualServer*>::iterator nextStep = walk; nextStep++; if (nextStep == vServers.end()) nextStep = vServers.begin(); double distance = keyDistance (walk->first,nextStep->first); psArcs[nextStep->second->rootPs->getId()] += distance; } double okGapPct = 0.; for (int i = 0; i < psCount; i++) { if (ps[i].isAlive()) { if (ps[i].getWork() <= ps[i].getCapacity()) { okGapPct += psArcs[i]; } } else { ASSERT (psArcs[i] == 0.); } } ASSERT (okGapPct >= 0. && okGapPct <= 1.); return okGapPct;}void printServerStats (FILE *fp, PhysicalServer *ps) { if (!warnedAboutEarliestRound && earliestLBro
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -