📄 lb.cc
字号:
break; ///------------------------------- case 'r'://?? recordingStats = true; sumStats->clear();// printf ("Recording stats.\n"); break; case 'w'://work shift -S shift workload to new distribution halfway through run ASSERT(!oracle); printf ("shifting workload\n"); delete keyDist; keyDist = distFactory->newDistribution (keyDistStr); if (keyDist == NULL) printUsage ("Problem with key distribution"); break; ///-------------------------------- case 'q'://结束整个事件发生 finished = true; break; } } //若当前轮为0, 则一次性合并同属于一个物理节点 //的虚拟节点 ASSERT (psUp > 0); if (!oracle && cRound == 0) { mergeVServers (); } #ifdef DEBUG if (debug) { printf ("relocated list\n"); for (set<double>::iterator p = gracefulRelocateVServers.begin(); p != gracefulRelocateVServers.end(); p++) { printf ("R %d relocated %f\n", cRound, *p); } } #endif /* *添加数据开始 */ map<double,VirtualServer*>::iterator source; VirtualServer *sVs; if(cRound%object_round==0) { for(int i=0;i<num_create;i++) { object=new Object(num_block);//数据产生 object->putObject();//数据插入到map //随机找一个源节点,数据散布 source= vServers.lower_bound (randPct()); sVs = source->second; object->publish(sVs,interimStats); } }/* *添加数据结束 */ /* * PER ROUND VIRTUAL SERVER CREATION, DELETION, SPLITTING, AND TRANSFER */ DEBUG_P (("\nVS ACTIONS: %d\n", cRound)); for (int i = 0; activeLBmethod != '-' && !oracle && cRound > earliestLBround && i < psCount; i++) { if (ps[i].isAlive()) {//该物理节点存在 if (ps[i].canTakeLBaction()) { int vsCurrentSize = vServers.size(); // transfer 方案 if (activeLBmethod == 't') {//transfer if (ps[i].isOverloaded ()) {// overload ? if (ps[i].getVsCount () == 1) {//仅有一个虚拟节点 // split can fail if we are adjacent to our neighbor if (ps[i].splitVs ()) { ASSERT (vServers.size() == vsCurrentSize+1); interimStats->splitVs(); } } else {//多个虚拟节点 ASSERT (ps[i].getVsCount () > 1); int vsDelta = 0; if (ps[i].transferVs (vsDelta)) { interimStats->transferVs(true); DEBUG_P (("successful transfer\n")); } else { interimStats->transferVs(false); } if (vServers.size() != vsCurrentSize + vsDelta) { ASSERT (vServers.size() == vsCurrentSize + vsDelta); } } } } //p方案 else if (activeLBmethod == 'p') {// if (ps[i].isOverloaded () && ps[i].getVsCount () > 1) { int vsDelta = 0; ps[i].deleteVs(vsDelta); ASSERT (vServers.size() == vsCurrentSize+vsDelta); interimStats->deleteVs(); } else if (ps[i].isUnderloaded() && ps[i].canAddVs(maxVsPerNode)) { DEBUG_P (("PS %d about to createVs\n", i)); double targetWork = ps[i].getExtraCapacity(); double predictedWork = 0.; if (ps[i].createVs (1,targetWork,predictedWork)) { ASSERT (vServers.size() == vsCurrentSize+1); } // increment for any kind of activity interimStats->createVs(); } } //k-choice 方案 else if (activeLBmethod == 'k') {//k-choice if (ps[i].isOverloaded () || ps[i].isUnderloaded()) { int vsDelta = 0; int actionCode = ps[i].rechooseVs (vsDelta); switch (actionCode) { case 0: break; case 1: interimStats->createVs(); break; case 2: interimStats->deleteVs(); break; case 3: interimStats->deleteVs(); interimStats->createVs(); break; default: ASSERT (0); } } } // g 方案 else if (activeLBmethod == 'g') {//g int vsDelta = 0; int actionCode = ps[i].thresholdVs (vsDelta); switch (actionCode) { case 0: break; case 1: // did neighbor adjust break; case 2: // did reorder interimStats->deleteVs(); interimStats->createVs(); break; default: ASSERT (0); } } } } } DEBUG_P (("R %d Start of finger updates\n", cRound)); /* * UPDATE FINGER POINTERS */ //本轮删除的所有虚拟节点 deadVServersCountVector.push_back ((double)(deadVServers.size())); //本轮重新安排的虚拟节点 gracefulVServersCountVector.push_back ((double)(gracefulRelocateVServers.size())); if (!oracle || (oracle && cRound == 0)) { for (map<double,VirtualServer*>::iterator p = vServers.begin(); p != vServers.end(); p++) {//依次对各个虚拟节点的fingers 修复 int msgCount = p->second->fixFingers (); interimStats->maintMsg (msgCount);//发送的维护消息计数 } deadVServers.clear (); gracefulRelocateVServers.clear (); } DEBUG_P (("R %d End of finger updates\n", cRound)); /* * PER ROUND QUERIES */ DEBUG_P (("QUERIES\n")); //产生每轮发生查询的数量 int queryCount = (int)(ceil(psUp * queriesPerRound));//queriesPerRound 由启动参数指定 //产生查询的起始的虚拟节点 map<double,VirtualServer*>::iterator p = vServers.lower_bound (randPct()); //按分布产生的查询目标集合 vector<double> dstKeys; dstKeys.reserve (queryCount);//分配空间 for (int i = 0; i < queryCount; i++) { double dstKey = keyDist->next();//产生各个目标key dstKeys.push_back (dstKey); } int queryIndex = -1; int querySuccessCount = 0; //int ObjectFailedCount=0; //int ObjectSuccessCount=0; deque<bool> previousQuerySuccess; initializePreviousQuerySuccess (previousQuerySuccess); //对每个查询的目标进行路由 //发出查询的源虚拟节点随机产生 for (vector<double>::iterator q = dstKeys.begin(); q != dstKeys.end(); q++) { queryIndex++; double dstKey = *q;//本次查询的目标id ASSERT (dstKey >= 0. && dstKey < 1.); DEBUG_P (("Starting query for key %f\n",dstKey)); // iterate p p++; if (p == vServers.end()) p = vServers.begin(); VirtualServer *srcVs = p->second;//发出查询的源虚拟节点 //***************************************************************************** //******查询object,现在map中查找是否存在, //Object *o; //totalcount++; map<double,Object*>::iterator oit=mObject.lower_bound(dstKey); Object *o; if(oit!=mObject.end())//查询的object不存在 { o=oit->second; //printf("success1\n"); if(o->search(srcVs, M,interimStats)) { ObjectSuccessCount++; //printf("success22\n"); } else ObjectFailCount++; continue; } //printf("faild \n"); //ObjectFailCount++; //***************************************************************************** vector<double> hops;//记录经过的虚拟节点 vector<int> fingersUsed;// //查询的路由过程 //同时记录了查询的经过 bool unlimitedRouteSuccess = srcVs->findHopDistance (dstKey, hops, fingersUsed, -1.); if (0 && hops.size() > 20) {//这里0 是调试用的 printf ("srcVs %f dstVs %f\n", srcVs->getKey(), dstKey); int i = 0; for (vector<double>::iterator h = hops.begin(); h != hops.end(); h++) { printf ("hop %d %f finger %d\n", i, *h, fingersUsed[i]); i++; } } //统计记录 interimStats->hopSrcDst (unlimitedRouteSuccess, hops.size()-1); hops.clear(); fingersUsed.clear(); //路由查询的过程 bool routeSuccess = srcVs->route (dstKey, hops, fingersUsed, true); int hopCount = hops.size()-1; if (hopCount < 0) hopCount = 0; if (routeSuccess) {//路由成功 DEBUG_P (("R %d query success for key %f in hops %d\n", cRound, dstKey, hopCount)); interimStats->query (true, hopCount); querySuccessCount++; } else {//路由失败 DEBUG_P (("R %d query fail for key %f\n",cRound,dstKey)); interimStats->query (false, hopCount); } interimStats->queryMsg (hopCount); #ifdef PER_QUERY_STATS //调试用代码 // per query stats if (cRound == 60) { double pctRingOK = findPctRingUnderCapacity (ps); double hopsEstimate = .5 * log2(psUp); double qSEstimate = pow (pctRingOK, hopsEstimate); double runningAvgqS = 0; double successValue = 0.; if (routeSuccess) successValue = 1.; runningAvgqS = querySuccessCount/(double)(queryIndex+1); addToPreviousQuerySuccess (previousQuerySuccess,routeSuccess); double recentTen = avgOfPreviousQuerySuccess (previousQuerySuccess,10); double recentTwenty = avgOfPreviousQuerySuccess (previousQuerySuccess,20); fprintf (traceFP, "q %d %f real %f est %f ten %f twenty %f pctOK %f\n", queryIndex, successValue, runningAvgqS,qSEstimate,recentTen,recentTwenty,pctRingOK); } #endif } IF_DEBUG (printPs (ps,stdout);) /* * RECORD END OF ROUND */ int workThisRound = 0; for (int i = 0; i < psCount; i++) { workThisRound += ps[i].endOfRound (); } interimStats->psUp (psUp); interimStats->vsUp (vServers.size()); cRound++;//下一轮from 0 if (cRound % summaryFrequency == 0) {//每经过一定的轮数,则记录 if (recordingStats) sumStats->add (interimStats); fprintf (runLogFP, "R %d ", cRound); fprintf (stdout, "R %d ", cRound); interimStats->print (runLogFP); interimStats->print (stdout); interimStats->clear (); printUtilHistogram (runLogFP,ps); printServerStats (runLogFP,ps);//vservers printFingerStats (runLogFP);//fingers } } // END OF ROUNDS /* * PRINT STATS */ // printLinks must come before printPs as it adds up the msgCounts sumStats->add (interimStats); linksFP = openOutputFile (outputPrefix, ".link"); summaryFP = openOutputFile (outputPrefix, ".sum"); nodeFP = openOutputFile (outputPrefix, ".node"); fprintf (summaryFP, "lF %s lA %d cF %s cA %d q %5.2f k %c %4.3f i %d v %d L %4.3f U %4.3f a %c %d s %d ", churnFile, lifetimeAverage, capacityFile, capacityAverage, queriesPerRound, keyDistStr[0], keyDist->getMean(), idChoice, initialVsPerNode, lowerSlack, upperSlack, activeLBmethod, methodToCode (idChoice, initialVsPerNode, activeLBmethod), seed); totalcount=ObjectSuccessCount+ObjectFailCount; fprintf(summaryFP,"\n\nTotal Query %d:success %d,fail %d\n\n",totalcount,ObjectSuccessCount,ObjectFailCount); sumStats->print (summaryFP); printLinks (ps, linksFP, false); printPs (ps, nodeFP); // printVs (traceFP); // destroy PS and route tables printf ("Shutting down.\n"); shuttingDown = true; // deleteNodes (ps); delete keyDist; closeChurnFile (); delete interimStats; delete sumStats; delete distFactory;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -