📄 query.cpp
字号:
meridProcess->getRings()->eraseNode(*it); badNodes.push_back(*it); } } for (u_int i = 0; i < badNodes.size(); i++) { removeCandidateNode(badNodes[i]); } performReplacement(); finished = true; return 0;}double RingManageQuery::getVolume(coordT* points, int dim, int numpoints){ boolT ismalloc= False; /* True if qhull should free points in qh_freeqhull() or reallocation */ char flags[250]; /* option flags for qhull, see qh_opt.htm */ FILE *outfile= NULL; /* output from qh_produce_output() use NULL to skip qh_produce_output() */ FILE *errfile= NULL; /* error messages from qhull code */ int curlong, totlong; /* memory remaining after qh_memfreeshort */ /* Write these to NULL, we don't want to keep them */ outfile = fopen("/dev/null", "r+"); errfile = fopen("/dev/null", "r+"); /* Run 1: convex hull */ sprintf(flags, "qhull FA"); int exitcode= qh_new_qhull (dim, numpoints, points, ismalloc, flags, outfile, errfile); double returnVolume = 0.0; if (exitcode == 0) { returnVolume = qh totvol; } else { ERROR_LOG("Bad exit code for qhull hypervolume computation\n"); } fclose(outfile); fclose(errfile); qh_freeqhull(!qh_ALL); /* free long memory */ qh_memfreeshort (&curlong, &totlong); /* free short memory and memory allocator */ if (curlong || totlong) { ERROR_LOG_2("qhull internal warning (user_eg, #1): " "did not free %d bytes of long memory " "(%d pieces)\n", totlong, curlong); } return returnVolume;}/* Calculates the hypervolume from the latency matrix*/double RingManageQuery::calculateHV( const int N, // Physical size of the latencyMatrix const int NPrime, // Size of the latencyMatrix in use double* latencyMatrix) // Pointer to latencyMatrix{ /* Time to perform Gram Schmidt to reduce the dimension by 1 */ GramSchmidtOpt gs(NPrime); /* tmpModMatrix is the latencyMatrix where every row subtracts the last row in the matrix (and the last row is all 0) */ double* tmpModMatrix = (double*)malloc(sizeof(double) * NPrime * NPrime); for (int i = 0; i < NPrime - 1; i++) { // Copy from latencyMatrix to tmpModMatrix cblas_dcopy(NPrime, &latencyMatrix[i * N], 1, &tmpModMatrix[i * NPrime], 1); // Perform the subtraction cblas_daxpy(NPrime, -1.0, &latencyMatrix[(NPrime-1) * N], 1, &tmpModMatrix[i * NPrime] , 1); gs.addVector(&tmpModMatrix[i * NPrime]); } /* Zero out last row explictly */ for (int i = 0; i < NPrime; i++) { tmpModMatrix[(NPrime-1) * NPrime + i] = 0.0; } /* Retrieve the orthogonal matrix that has been computed */ int orthSize; double* orthMatrix = gs.returnOrth(&orthSize); /* Now re-create the latency matrix based on the dot product */ coordT* latencyMatrixMod = (double*) malloc(sizeof(double) * orthSize * NPrime); cblas_dgemm(CblasRowMajor, CblasNoTrans, CblasTrans, NPrime, orthSize, NPrime, 1.0, tmpModMatrix, NPrime, orthMatrix, NPrime, 0.0, latencyMatrixMod, orthSize); free(tmpModMatrix); // No no longer useful, can delete /* Let's get the hypervolume */ double hyperVolume = getVolume(latencyMatrixMod, orthSize, NPrime); free(latencyMatrixMod); // Done, free memory of intermediate structure return hyperVolume;}/* Used in diverse set formation, it reduces at set of nodes by N nodes, where the remaining nodes have the approximately highest hypervolume*/double RingManageQuery::reduceSetByN( vector<NodeIdent>& inVector, // Vector of nodes vector<NodeIdent>& deletedNodes, int numReduction, // How many nodes to remove double* latencyMatrix) // Pointer to latencyMatrix{ int N = inVector.size(); // Dimension of matrix int colSize = N; int rowSize = N; double maxHyperVolume = 0.0; // Perform reductions iteratively for (int rCount = 0; rCount < numReduction; rCount++) { bool maxHVNodeFound = false; NodeIdent maxHVNode = {0, 0}; double maxHV = 0.0; maxHyperVolume = 0.0; // Reset /* Iterate through the nodes */ for (u_int k = 0; k < inVector.size(); k++) { //if (anchorNodes != NULL && // anchorNodes->find(*eIt) != anchorNodes->end()) { // continue; // We want to skip this anchor, as we can't remove it //} // Swap out the current working column for (int i = 0; i < rowSize; i++) { double tmpValue = latencyMatrix[i * N + k]; latencyMatrix[i * N + k] = latencyMatrix[i * N + colSize - 1]; latencyMatrix[i * N + colSize - 1] = tmpValue; } colSize--; // And the corresponding row information cblas_dswap(colSize, &latencyMatrix[k * N], 1, &latencyMatrix[(rowSize-1) * N], 1); rowSize--; assert(rowSize == colSize); // Calcuate the hypervolume without this node double hyperVolume = calculateHV(N, rowSize, latencyMatrix); /* See if it is the minimum so far Rationale: By removing this node, we still have the maxHV comparing to removing any other node. Therefore, we want to remove this node to keep a big HV */ if (hyperVolume >= maxHV) { maxHVNodeFound = true; maxHVNode = inVector[k]; maxHV = hyperVolume; } // The max hypervolume at this reduction level if (hyperVolume > maxHyperVolume) { maxHyperVolume = hyperVolume; } // Undo row and column swap rowSize++; cblas_dswap(colSize, &latencyMatrix[k * N], 1, &latencyMatrix[(rowSize-1) * N], 1); colSize++; for (int i = 0; i < rowSize; i++) { double tmpValue = latencyMatrix[i * N + k]; latencyMatrix[i * N + k] = latencyMatrix[i * N + colSize - 1]; latencyMatrix[i * N + colSize - 1] = tmpValue; } } if (maxHVNodeFound == false) { // Could not reduce any further, all anchors assert(false); // This shouldn't really happen for any valid case return 0.0; } // For the node that we have removed, remove it from the latency // matrix as well as from the vector of nodes for (u_int k = 0; k < inVector.size(); k++) { if ((inVector[k].addr == maxHVNode.addr) && (inVector[k].port == maxHVNode.port)) { for (int i = 0; i < rowSize; i++) { double tmpValue = latencyMatrix[i * N + k]; latencyMatrix[i * N + k] = latencyMatrix[i * N + colSize - 1]; latencyMatrix[i * N + colSize - 1] = tmpValue; } colSize--; cblas_dswap(colSize, &latencyMatrix[k * N], 1, &latencyMatrix[(rowSize-1) * N], 1); rowSize--; deletedNodes.push_back(inVector[k]); inVector[k] = inVector.back(); inVector.pop_back(); } } } return maxHyperVolume;}double* RingManageQuery::createLatencyMatrix() { int N = remoteNodes.size(); // Dimension of matrix // Allocate the matrix here double* latencyMatrix = (double*) malloc(sizeof(double) * N * N); if (latencyMatrix == NULL) { return NULL; } set<NodeIdent, ltNodeIdent>::iterator outerIt = remoteNodes.begin(); // Create coordnates for each of these nodes for (u_int i = 0; outerIt != remoteNodes.end(); outerIt++, i++) { set<NodeIdent, ltNodeIdent>::iterator innerIt = remoteNodes.begin(); for (u_int j = 0; innerIt != remoteNodes.end(); innerIt++, j++) { if (i == j) { latencyMatrix[i * N + j] = 0.0; } else { map<NodeIdent, map<NodeIdent, u_int, ltNodeIdent>*, ltNodeIdent>::iterator thisNodeMapIt = RetNodeMap.find(*outerIt); if (thisNodeMapIt == RetNodeMap.end()) { ERROR_LOG("Outer member not found: createLatMatrix\n"); free(latencyMatrix); return NULL; } else { map<NodeIdent, u_int, ltNodeIdent>* thisNodeMap = thisNodeMapIt->second; map<NodeIdent, u_int, ltNodeIdent>::iterator findCur = thisNodeMap->find(*innerIt); if (findCur == thisNodeMap->end()) { ERROR_LOG("Inner member not found: createLatMatrix\n"); free(latencyMatrix); return NULL; } else { // Stored in milliseconds latencyMatrix[i * N + j] = findCur->second / 1000.0; } } } WARN_LOG_1("%0.2f ", latencyMatrix[i * N + j]); } WARN_LOG("\n"); } return latencyMatrix;}int RingManageQuery::performReplacement() { if (remoteNodes.size() != RetNodeMap.size()) { ERROR_LOG("Inconsistent data structure for ring replacement\n"); return -1; } const u_int fullPrimRingSize = meridProcess->getRings()->nodesInPrimaryRing(); vector<NodeIdent> removedNodes; while (true) { if (remoteNodes.size() <= fullPrimRingSize) { return 0; // Not enough node to perform replacement } NodeIdent worstNode = {0, 0}; u_int worstNodeCount = UINT_MAX; // Let's ensure we have a full matrix map<NodeIdent, map<NodeIdent, u_int, ltNodeIdent>*, ltNodeIdent>:: iterator retNodeIt = RetNodeMap.begin(); for (; retNodeIt != RetNodeMap.end(); retNodeIt++) { if (((retNodeIt->second)->size() + 1) < remoteNodes.size()) { if ((retNodeIt->second)->size() < worstNodeCount) { worstNode = retNodeIt->first; worstNodeCount = (retNodeIt->second)->size(); } } } if ((worstNode.addr == 0) && (worstNode.port == 0)) { break; // Done, all nodes satisfy } else { removedNodes.push_back(worstNode); u_int numPrev = remoteNodes.size(); removeCandidateNode(worstNode); // Remote node should decrease in size by one on every loop if ((remoteNodes.size() + 1) != numPrev) { ERROR_LOG("Inconsistent data structure for ring replacement\n"); return -1; } } } double* matrix = createLatencyMatrix(); if (matrix != NULL) { vector<NodeIdent> newPrimNodes; //vector<NodeIdent> removedNodes; set<NodeIdent, ltNodeIdent>::iterator it = remoteNodes.begin(); for (; it != remoteNodes.end(); it++) { newPrimNodes.push_back(*it); }#ifdef DEBUG double hv = reduceSetByN(newPrimNodes, removedNodes, newPrimNodes.size() - fullPrimRingSize, matrix); WARN_LOG_1("@@@@@@@@@@@ Max hypervolume is %0.2f @@@@@@@@@@@@\n", hv);#else reduceSetByN(newPrimNodes, removedNodes, newPrimNodes.size() - fullPrimRingSize, matrix);#endif free(matrix); // Matrix must be released if (meridProcess->getRings()->setRingMembers( ringNum, newPrimNodes, removedNodes) == -1) { WARN_LOG("!!!!!!!!!!!! RING REPLACEMENT UNSUCCESSFUL !!!!!!!!\n"); } else { WARN_LOG("************ RING REPLACEMENT SUCCESSFUL **********\n"); } } return 0;}HandleReqGeneric::HandleReqGeneric(uint64_t id, const NodeIdentRendv& in_srcNode, const vector<NodeIdent>& in_remote, MeridianProcess* in_process) : qid(id), srcNode(in_srcNode), finished(false), meridProcess(in_process) { computeTimeout(MAX_RTT_MS * MICRO_IN_MILLI, &timeoutTV); // Copy all targets over for (u_int i = 0; i < in_remote.size(); i++) { if (in_remote[i].addr == 0 && in_remote[i].port == 0) { // Requesting pinging of src node. The src node really // should NOT be behind a firewall NodeIdent tmpIdent = {srcNode.addr, srcNode.port}; remoteNodes.insert(tmpIdent); } else { remoteNodes.insert(in_remote[i]); } } }int HandleReqGeneric::handleLatency( const vector<NodeIdentLat>& in_remoteNodes) { // Done (all retrieved from cache). Send return packet if (remoteLatencies.size() == remoteNodes.size()) { finished = true; // Received results from everybody return sendReturnPacket(); } // Not done, therefore, the vector must have at least one entry if (in_remoteNodes.size() != 1) { return -1; } NodeIdent in_remote = {in_remoteNodes[0].addr, in_remoteNodes[0].port}; u_int latency_us = in_remoteNodes[0].latencyUS; if (remoteNodes.find(in_remote) == remoteNodes.end()) { ERROR_LOG("Received result from unexpected node\n"); return -1; } if (remoteLatencies.find(in_remote) != remoteLatencies.end()) { ERROR_LOG("Received duplicate results\n"); return -1; } remoteLatencies[in_remote] = latency_us; if (remoteLatencies.size() != remoteNodes.size()) { return 0; // More results needed } finished = true; // Received results from everybody return sendReturnPacket();}int HandleReqGeneric::handleTimeout() { finished = true; return sendReturnPacket();}int HandleReqGeneric::sendReturnPacket() { RetPing retPacket(qid); map<NodeIdent, u_int, ltNodeIdent>::iterator it = remoteLatencies.begin(); for (; it != remoteLatencies.end(); it++) { retPacket.addNode(it->first, it->second); } RealPacket* inPacket = new RealPacket(srcNode); if (retPacket.createRealPacket(*inPacket) == -1) { delete inPacket; return -1; } meridProcess->addOutPacket(inPacket); return 0;} int HandleReqGeneric::init() { //setStartTime(); set<NodeIdent, ltNodeIdent>* curRemoteNodes = getRemoteNodes(); set<NodeIdent, ltNodeIdent>::iterator it = curRemoteNodes->begin(); for (; it != curRemoteNodes->end(); it++) { NodeIdent curIdent = *it;/* if (curIdent.addr == 0 && curIdent.port == 0) { // Requesting pinging of src node. The src node really // should NOT be behind a firewall curIdent.addr = srcNode.addr;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -