📄 query.cpp
字号:
betaRatio = 0.5; } // Calculate the forwarding threshold u_int latencyThreshold = 0; long long tmpLatThreshold_ll = llround(betaRatio * (double)averageLatUS); // Just to be extra careful with rounding if (tmpLatThreshold_ll > UINT_MAX) { latencyThreshold = UINT_MAX; } else if (tmpLatThreshold_ll < 0) { latencyThreshold = 0; } else { latencyThreshold = (u_int)tmpLatThreshold_ll; } WARN_LOG_1("Latency threshold is %d\n", latencyThreshold); WARN_LOG_1("Lowest latency is %d\n", lowestLatUS); WARN_LOG_1("My latency is %d\n", averageLatUS); if (lowestLatUS == 0 || lowestLatUS > latencyThreshold) { // Did not meet the threshold, return closest so far RetResponse* retResp = NULL; // Pick the closest we know right now and return if (lowestLatUS < averageLatUS) { // Reusing iterator it it = ringLatencies.find(closestMember); assert(it != ringLatencies.end()); retResp = new RetResponse(qid, closestMember.addr, closestMember.port, *(it->second)); } else { // Itself is the closest retResp = new RetResponse(qid, 0, 0, remoteLatencies); } RealPacket* inPacket = new RealPacket(srcNode); if (retResp->createRealPacket(*inPacket) == -1) { delete inPacket; } else { meridProcess->addOutPacket(inPacket); } delete retResp; // Done with RetResponse finished = true; } else {#ifdef DEBUG u_int netAddr = htonl(closestMember.addr); char* remoteString = inet_ntoa(*(struct in_addr*)&(netAddr)); WARN_LOG_2("Forwarding to closest member %s:%d\n", remoteString, closestMember.port);#endif NodeIdent tmpRendvNode = meridProcess->returnRendv(); ReqClosestGeneric* reqClosest = createReqClosest(qid, betaNumer, betaDenom, tmpRendvNode.addr, tmpRendvNode.port); set<NodeIdent, ltNodeIdent>::iterator it = remoteNodes.begin(); for (; it != remoteNodes.end(); it++) { reqClosest->addTarget(*it); } NodeIdentRendv tmpRendvOut = {closestMember.addr, closestMember.port, 0, 0}; set<NodeIdentRendv, ltNodeIdentRendv>::iterator setRendvIt = ringMembers.find(tmpRendvOut); if (setRendvIt == ringMembers.end()) { ERROR_LOG("Data in HandleClosestGeneric inconsistent\n"); } else { // This gets the rendavous data tmpRendvOut = *setRendvIt; } RealPacket* inPacket = new RealPacket(tmpRendvOut); if (reqClosest->createRealPacket(*inPacket) == -1) { delete inPacket; finished = true; } else { selectedMember = closestMember; meridProcess->addOutPacket(inPacket); computeTimeout(MAX_RTT_MS * MICRO_IN_MILLI, &timeoutTV); stateMachine = HC_WAIT_FOR_FIN; } delete reqClosest; } return 0;}int HandleClosestGeneric::sendReqProbes() { if (stateMachine != HC_WAIT_FOR_DIRECT_PING || remoteLatencies.size() != remoteNodes.size()) { return -1; // More results needed } // Find the longest and average time in the remoteLatenties map u_int largestLatUS = 0; u_int smallestLatUS = UINT_MAX; //if (getMaxAndAverage(remoteLatencies, // &largestLatUS, &averageLatUS) == -1) { if (getMaxAndMinAndAverage(remoteLatencies, &largestLatUS, &smallestLatUS, &averageLatUS) == -1) { ERROR_LOG("Incorrect MAX/MIN/AVG calculation\n"); finished = true; return -1; } WARN_LOG_1("Largest latency is %d\n", largestLatUS); double betaRatio = ((double)betaNumer) / ((double)betaDenom); WARN_LOG_1("Beta ratio is %0.2f\n", betaRatio); if (betaRatio <= 0.0 || betaRatio >= 1.0) { ERROR_LOG("Illegal beta parameter\n"); betaRatio = 0.5; } // Update timeout u_int newTimeoutPeriod = (u_int) ceil( ((2.0 * betaRatio) + 1.0) * (double)largestLatUS); computeTimeout(newTimeoutPeriod, &timeoutTV); // Change to next state stateMachine = HC_INDIRECT_PING; // if ((meridProcess->getRings()->fillVector(averageLatUS, betaRatio, // ringMembers) == -1) || (ringMembers.size() == 0)) { if ((averageLatUS == 0) || //(meridProcess->getRings()->fillVector(averageLatUS, // averageLatUS, betaRatio, ringMembers) == -1) || (meridProcess->getRings()->fillVector(smallestLatUS, largestLatUS, betaRatio, ringMembers) == -1) || (ringMembers.size() == 0)) { // 0, 0 means itself RetResponse retPacket(qid, 0, 0, remoteLatencies); RealPacket* inPacket = new RealPacket(srcNode); if (retPacket.createRealPacket(*inPacket) == -1) { delete inPacket; } else { meridProcess->addOutPacket(inPacket); } finished = true; return 0; } // Send a ReqTCPProbeAverage to each of the ring members set<NodeIdentRendv, ltNodeIdentRendv>::iterator setIt = ringMembers.begin(); for (; setIt != ringMembers.end(); setIt++) {/* NodeIdent rendvIdent; NodeIdentRendv outIdent = {setIt->addr, setIt->port, 0, 0}; if (meridProcess->getRings()->rendvLookup(*setIt, rendvIdent) != -1) { outIdent.portRendv = rendvIdent.port; outIdent.addrRendv = rendvIdent.addr; } ReqProbeGeneric* newQuery = createReqProbe(outIdent, remoteNodes, meridProcess);*/ ReqProbeGeneric* newQuery = createReqProbe(*setIt, remoteNodes, meridProcess); if (meridProcess->getQueryTable()->insertNewQuery( newQuery) == -1) { delete newQuery; } else { newQuery->subscribeLatency(qid); newQuery->init(); } } return 0;}int HandleClosestGeneric::handleLatency( const vector<NodeIdentLat>& in_remoteNodes) { // Determine action depending on state switch (stateMachine) { case HC_INIT: { return 0; // Unexpected, just return 0 } break; case HC_WAIT_FOR_DIRECT_PING: { if (remoteLatencies.size() == remoteNodes.size()) { return sendReqProbes(); } // Must have at least one entry in vector unless all entries // are already accounted for if (in_remoteNodes.size() != 1) { return -1; } NodeIdent in_remote = {in_remoteNodes[0].addr, in_remoteNodes[0].port}; u_int latency_us = in_remoteNodes[0].latencyUS; if (remoteNodes.find(in_remote) == remoteNodes.end()) { return -1; } if (remoteLatencies.find(in_remote) != remoteLatencies.end()) { ERROR_LOG("Received duplicate results\n"); return -1; } remoteLatencies[in_remote] = latency_us; if (remoteLatencies.size() != remoteNodes.size()) { return 0; // More results needed } // All results in, can send indirect probes return sendReqProbes(); } break; case HC_INDIRECT_PING: { if (in_remoteNodes.size() < 1) { ERROR_LOG("There must be at least one entry\n"); return -1; } // HACK: The first entry tell me where the source is NodeIdent curNode = {in_remoteNodes[0].addr, in_remoteNodes[0].port}; // The comparison function doesn't care about the rendv // part of the NodeIdentRendv structure NodeIdentRendv curNodeRendv = {in_remoteNodes[0].addr, in_remoteNodes[0].port, 0, 0}; // Has to be a ring member if (ringMembers.find(curNodeRendv) == ringMembers.end()) { ERROR_LOG("Results from an unexpected node\n"); return -1; } // This ring member must not already have a latency map if (ringLatencies.find(curNode) != ringLatencies.end()) { ERROR_LOG("Duplicate response from node\n"); return -1; } map<NodeIdent, u_int, ltNodeIdent>* tmpIdentMap = new map<NodeIdent, u_int, ltNodeIdent>(); for (u_int i = 1; i < in_remoteNodes.size(); i++) { NodeIdent tmpIdent = {in_remoteNodes[i].addr, in_remoteNodes[i].port}; // Each result must be part of remoteNodes if (remoteNodes.find(tmpIdent) == remoteNodes.end()) { ERROR_LOG("Results consist of unexpected nodes\n"); delete tmpIdentMap; return -1; } (*tmpIdentMap)[tmpIdent] = in_remoteNodes[i].latencyUS; } // The result size must equal the number of remote nodes if (remoteNodes.size() != tmpIdentMap->size()) { ERROR_LOG("Incorrect number of nodes responded\n"); delete tmpIdentMap; return -1; } ringLatencies[curNode] = tmpIdentMap; // If some ring members haven't answered yet, wait for more if (ringMembers.size() != ringLatencies.size()) { return 0; // More responses expected } return handleForward(); } break; case HC_WAIT_FOR_FIN: { return 0; // Unexpected } break; default: { assert(false); } break; } return 0; }int HandleClosestGeneric::handleTimeout() { switch (stateMachine) { case HC_INIT: { // Init wasn't called after being pushed into query table finished = true; } break; case HC_WAIT_FOR_DIRECT_PING: { // Can't ping every body, return a RET_ERROR RetError retPacket(qid); RealPacket* inPacket = new RealPacket(srcNode); if (retPacket.createRealPacket(*inPacket) == -1) { delete inPacket; } else { meridProcess->addOutPacket(inPacket); } finished = true; } break; case HC_INDIRECT_PING: { return handleForward(); } break; case HC_WAIT_FOR_FIN: { // Done, nothing to forward back, something screwed up. Send // back an error packet RetError retPacket(qid); RealPacket* inPacket = new RealPacket(srcNode); if (retPacket.createRealPacket(*inPacket) == -1) { delete inPacket; } else { meridProcess->addOutPacket(inPacket); } finished = true; } break; default: { assert(false); } break; } return 0;}HandleClosestGeneric::~HandleClosestGeneric() { map<NodeIdent, map<NodeIdent, u_int, ltNodeIdent>*, ltNodeIdent>::iterator it = ringLatencies.begin(); for (; it != ringLatencies.end(); it++) { if (it->second != NULL) { delete it->second; } }}int HandleClosestTCP::getLatency(const NodeIdent& inNode, uint32_t* latencyUS) { return getMerid()->tcpCacheGetLatency(inNode, latencyUS);}int HandleClosestDNS::getLatency(const NodeIdent& inNode, uint32_t* latencyUS) { return getMerid()->dnsCacheGetLatency(inNode, latencyUS);}int HandleClosestPing::getLatency(const NodeIdent& inNode, uint32_t* latencyUS){ return getMerid()->pingCacheGetLatency(inNode, latencyUS);}#ifdef PLANET_LAB_SUPPORTint HandleClosestICMP::getLatency(const NodeIdent& inNode, uint32_t* latencyUS){ return getMerid()->icmpCacheGetLatency(inNode, latencyUS);}#endifint ReqProbeTCP::init() { //gettimeofday(&startTime, NULL); NodeIdent tmpRendvNode = getMerid()->returnRendv(); ReqMeasureTCP tcpPacket(getQueryID(), tmpRendvNode.addr, tmpRendvNode.port); set<NodeIdent, ltNodeIdent>* curRemoteNodes = getRemoteNodes(); set<NodeIdent, ltNodeIdent>::iterator it = curRemoteNodes->begin(); for (; it != curRemoteNodes->end(); it++) { tcpPacket.addTarget(*it); } RealPacket* inPacket = new RealPacket(getSrcNode()); if (tcpPacket.createRealPacket(*inPacket) == -1) { delete inPacket; return -1; } getMerid()->addOutPacket(inPacket); return 0;}int ReqProbeDNS::init() { //gettimeofday(&startTime, NULL); NodeIdent tmpRendvNode = getMerid()->returnRendv(); ReqMeasureDNS dnsPacket(getQueryID(), tmpRendvNode.addr, tmpRendvNode.port); set<NodeIdent, ltNodeIdent>* curRemoteNodes = getRemoteNodes(); set<NodeIdent, ltNodeIdent>::iterator it = curRemoteNodes->begin(); for (; it != curRemoteNodes->end(); it++) { dnsPacket.addTarget(*it); } RealPacket* inPacket = new RealPacket(getSrcNode()); if (dnsPacket.createRealPacket(*inPacket) == -1) { delete inPacket; return -1; } getMerid()->addOutPacket(inPacket); return 0;}int ReqProbePing::init() { //gettimeofday(&startTime, NULL); NodeIdent tmpRendvNode = getMerid()->returnRendv(); ReqMeasurePing pingPacket( getQueryID(), tmpRendvNode.addr, tmpRendvNode.port); set<NodeIdent, ltNodeIdent>* curRemoteNodes = getRemoteNodes(); set<NodeIdent, ltNodeIdent>::iterator it = curRemoteNodes->begin(); for (; it != curRemoteNodes->end(); it++) { pingPacket.addTarget(*it); } RealPacket* inPacket = new RealPacket(getSrcNode()); if (pingPacket.createRealPacket(*inPacket) == -1) { delete inPacket; return -1; } getMerid()->addOutPacket(inPacket); return 0;}#ifdef PLANET_LAB_SUPPORTint ReqProbeICMP::init() { //gettimeofday(&startTime, NULL); NodeIdent tmpRendvNode = getMerid()->returnRendv(); ReqMeasureICMP pingPacket( getQueryID(), tmpRendvNode.addr, tmpRendvNode.port); set<NodeIdent, ltNodeIdent>* curRemoteNodes = getRemoteNodes(); set<NodeIdent, ltNodeIdent>::iterator it = curRemoteNodes->begin(); for (; it != curRemoteNodes->end(); it++) { pingPacket.addTarget(*it); } RealPacket* inPacket = new RealPacket(getSrcNode()); if (pingPacket.createRealPacket(*inPacket) == -1) { delete inPacket; return -1; } getMerid()->addOutPacket(inPacket); return 0;}#endifReqProbeGeneric::ReqProbeGeneric(const NodeIdentRendv& in_src_node, const set<NodeIdent, ltNodeIdent>& in_remote, MeridianProcess* in_process) : srcNode(in_src_node), finished(false), meridProcess(in_process) { qid = meridProcess->getNewQueryID(); computeTimeout(2 * MAX_RTT_MS * MICRO_IN_MILLI, &timeoutTV); // Copy all targets over remoteNodes.insert(in_remote.begin(), in_remote.end());}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -