📄 query.cpp
字号:
ReqProbeGeneric::ReqProbeGeneric(const NodeIdentRendv& in_src_node, const set<NodeIdentConst, ltNodeIdentConst>& in_remote, MeridianProcess* in_process) : srcNode(in_src_node), finished(false), meridProcess(in_process) { qid = meridProcess->getNewQueryID(); computeTimeout(2 * MAX_RTT_MS * MICRO_IN_MILLI, &timeoutTV); // Copy all targets over set<NodeIdentConst, ltNodeIdentConst>::const_iterator it = in_remote.begin(); for (; it != in_remote.end(); it++) { NodeIdent tmp = {it->addr, it->port}; remoteNodes.insert(tmp); }}int ReqProbeGeneric::handleEvent( const NodeIdent& in_remote, const char* inPacket, int packetSize) { if (inPacket[0] != RET_PING_REQ) { ERROR_LOG("Expecting RET_PING_REQ packet, received something else\n"); return -1; // Not pong packet } if (in_remote.addr != srcNode.addr || in_remote.port != srcNode.port) { ERROR_LOG("Received packet from unexpected node\n"); return -1; } RetPing* newRetPing = RetPing::parse(inPacket, packetSize); if (newRetPing == NULL) { ERROR_LOG("Incorrect packet received\n"); return -1; } const vector<NodeIdentLat>* tmpVectLat = newRetPing->returnNodes(); // Not all the nodes are there if (tmpVectLat->size() != remoteNodes.size()) { ERROR_LOG("Only partial list of nodes returned\n"); delete newRetPing; return -1; } vector<NodeIdentLat> newTmpVect; // HACK: Add srcNode to the vector before telling subscriber NodeIdentLat outNIL = {srcNode.addr, srcNode.port, 0}; newTmpVect.push_back(outNIL); for (u_int i = 0; i < tmpVectLat->size(); i++) { newTmpVect.push_back((*tmpVectLat)[i]); } // Tell subscribers for (u_int i = 0; i < subscribers.size(); i++) { meridProcess->getQueryTable()->notifyQLatency( subscribers[i], newTmpVect); } delete newRetPing; // Done with packet finished = true; // Done with query return 0;}int ReqProbeGeneric::handleTimeout() { NodeIdent tmpIdent = {srcNode.addr, srcNode.port}; meridProcess->getRings()->eraseNode(tmpIdent); finished = true; return 0; }int ReqProbeGeneric::subscribeLatency(uint64_t in_qid) { subscribers.push_back(in_qid); return 0;}HandleMCGeneric::HandleMCGeneric(uint64_t id, u_short in_betaNumer, u_short in_betaDenom, const NodeIdentRendv& in_srcNode, const vector<NodeIdentConst>& in_remote, MeridianProcess* in_process) : qid(id), betaNumer(in_betaNumer), betaDenom(in_betaDenom), srcNode(in_srcNode), finished(false), meridProcess(in_process) { selectedMember.addr = 0; selectedMember.port = 0; computeTimeout(MAX_RTT_MS * MICRO_IN_MILLI, &timeoutTV); // Copy all targets over for (u_int i = 0; i < in_remote.size(); i++) { remoteNodes.insert(in_remote[i]); } stateMachine = HMC_INIT;}int HandleMCGeneric::init() { //gettimeofday(&startTime, NULL); set<NodeIdentConst, ltNodeIdentConst>::iterator it = remoteNodes.begin(); for (; it != remoteNodes.end(); it++) { NodeIdent tmp = {it->addr, it->port}; uint32_t curLatencyUS; if (getLatency(tmp, &curLatencyUS) == -1) { ProbeQueryGeneric* newQuery = createProbeQuery(tmp, meridProcess); if (meridProcess->getQueryTable()->insertNewQuery(newQuery) == -1) { delete newQuery; continue; } newQuery->subscribeLatency(qid); newQuery->init(); } else { remoteLatencies[tmp] = curLatencyUS; } } RetInfo curRetInfo(qid, 0, 0); // Send back an intermediate info packet RealPacket* inPacket = new RealPacket(srcNode); if (curRetInfo.createRealPacket(*inPacket) == -1) { delete inPacket; } else { meridProcess->addOutPacket(inPacket); } stateMachine = HMC_WAIT_FOR_DIRECT_PING; if (remoteLatencies.size() == remoteNodes.size()) { // It's done, tell the query it is vector<NodeIdentLat> dummy; getMerid()->getQueryTable()->notifyQLatency(getQueryID(), dummy); } return 0; }int HandleMCGeneric::handleEvent( const NodeIdent& in_remote, const char* inPacket, int packetSize) { // Forward certain types of packets backwards, // such as RET_RESPONSE, in which case set finished = true if (stateMachine == HMC_WAIT_FOR_FIN) { BufferWrapper rb(inPacket, packetSize); char queryType; uint64_t queryID; if (Packet::parseHeader(rb, &queryType, &queryID) == -1) { assert(false); // This should not be possible } if (queryType == getQueryType()) { // Query may have gone in a loop, let just say we are the closest WARN_LOG("WARNING: Query may have gone in a loop\n"); RetResponse retPacket(qid, 0, 0, remoteLatencies); // NOTE: Don't need to use rendavous node as there should be a // hole in the NAT to in_remote as we just received the packet RealPacket* inPacket = new RealPacket(in_remote); if (retPacket.createRealPacket(*inPacket) == -1) { delete inPacket; } else { meridProcess->addOutPacket(inPacket); } } else if ((in_remote.addr == selectedMember.addr) && (in_remote.port == selectedMember.port)) { WARN_LOG("Received packet from selected ring member\n"); if (queryType == RET_RESPONSE) { RetResponse* retResp = RetResponse::parse(in_remote, inPacket, packetSize); if (retResp == NULL) { ERROR_LOG("Malformed packet received\n"); return -1; } RealPacket* inPacket = new RealPacket(srcNode); if (retResp->createRealPacket(*inPacket) == -1) { delete inPacket; } else { meridProcess->addOutPacket(inPacket); } delete retResp; // Done with RetResponse finished = true; } else if (queryType == RET_ERROR) { RetError* retErr = RetError::parse(inPacket, packetSize); if (retErr == NULL) { ERROR_LOG("Malformed packet received\n"); return -1; } RealPacket* inPacket = new RealPacket(srcNode); if (retErr->createRealPacket(*inPacket) == -1) { delete inPacket; } else { meridProcess->addOutPacket(inPacket); } delete retErr; // Done with RetResponse finished = true; } else if (queryType == RET_INFO) { RetInfo* curRetInfo = RetInfo::parse(in_remote, inPacket, packetSize); if (curRetInfo == NULL) { ERROR_LOG("Malformed packet received\n"); return -1; } RealPacket* inPacket = new RealPacket(srcNode); if (curRetInfo->createRealPacket(*inPacket) == -1) { delete inPacket; } else { meridProcess->addOutPacket(inPacket); } delete curRetInfo; } } } return 0; }int HandleMCGeneric::handleForward() { // We have all the information necessary to make a forwarding decision u_int lowestLatUS = UINT_MAX; NodeIdent closestMember = {0, 0}; // For the lowest latency node by iterating through the ring members // that have returned results back map<NodeIdent, map<NodeIdent, u_int, ltNodeIdent>*, ltNodeIdent>:: iterator it = ringLatencies.begin(); for (; it != ringLatencies.end(); it++) { u_int curAvgLatency; if (getAvgSolution(*(it->second), &curAvgLatency) == -1) { ERROR_LOG("Incorrect average solution calculation\n"); continue; } if (curAvgLatency < lowestLatUS) { lowestLatUS = curAvgLatency; closestMember = it->first; } } double betaRatio = ((double)betaNumer) / ((double)betaDenom); if (betaRatio <= 0.0 || betaRatio >= 1.0) { ERROR_LOG("Illegal beta parameter\n"); betaRatio = 0.5; } // Calculate the forwarding threshold u_int latencyThreshold = 0; long long tmpLatThreshold_ll = llround(betaRatio * (double)averageLatUS); // Just to be extra careful with rounding if (tmpLatThreshold_ll > UINT_MAX) { latencyThreshold = UINT_MAX; } else if (tmpLatThreshold_ll < 0) { latencyThreshold = 0; } else { latencyThreshold = (u_int)tmpLatThreshold_ll; } WARN_LOG_1("Latency threshold is %d\n", latencyThreshold); WARN_LOG_1("Lowest latency is %d\n", lowestLatUS); WARN_LOG_1("My latency is %d\n", averageLatUS); if (lowestLatUS == 0 || lowestLatUS > latencyThreshold) { // Did not meet the threshold, return closest so far RetResponse* retResp = NULL; // Pick the closest we know right now and return if (lowestLatUS < averageLatUS) { // Reusing iterator it it = ringLatencies.find(closestMember); assert(it != ringLatencies.end()); retResp = new RetResponse(qid, closestMember.addr, closestMember.port, *(it->second)); } else { // Itself is the closest retResp = new RetResponse(qid, 0, 0, remoteLatencies); } RealPacket* inPacket = new RealPacket(srcNode); if (retResp->createRealPacket(*inPacket) == -1) { delete inPacket; } else { meridProcess->addOutPacket(inPacket); } delete retResp; // Done with RetResponse finished = true; } else {#ifdef DEBUG u_int netAddr = htonl(closestMember.addr); char* remoteString = inet_ntoa(*(struct in_addr*)&(netAddr)); WARN_LOG_2("Forwarding to closest member %s:%d\n", remoteString, closestMember.port);#endif NodeIdent tmpRendvNode = meridProcess->returnRendv(); ReqConstraintGeneric* reqMC = createReqConstraint(qid, betaNumer, betaDenom, tmpRendvNode.addr, tmpRendvNode.port); set<NodeIdentConst, ltNodeIdentConst>::iterator it = remoteNodes.begin(); for (; it != remoteNodes.end(); it++) { reqMC->addTarget(*it); } NodeIdentRendv tmpRendvOut = {closestMember.addr, closestMember.port, 0, 0}; set<NodeIdentRendv, ltNodeIdentRendv>::iterator setRendvIt = ringMembers.find(tmpRendvOut); if (setRendvIt == ringMembers.end()) { ERROR_LOG("Data in HandleClosestGeneric inconsistent\n"); } else { // This gets the rendavous data tmpRendvOut = *setRendvIt; } RealPacket* inPacket = new RealPacket(tmpRendvOut); //RealPacket* inPacket = new RealPacket(closestMember); if (reqMC->createRealPacket(*inPacket) == -1) { delete inPacket; finished = true; } else { selectedMember = closestMember; meridProcess->addOutPacket(inPacket); computeTimeout(MAX_RTT_MS * MICRO_IN_MILLI, &timeoutTV); stateMachine = HMC_WAIT_FOR_FIN; } delete reqMC; } return 0;}int HandleMCGeneric::sendReqProbes() { if (stateMachine != HMC_WAIT_FOR_DIRECT_PING || remoteLatencies.size() != remoteNodes.size()) { return -1; // More results needed } // Find the longest and average time in the remoteLatenties map u_int largestAddUS; if (getMaxSolution(remoteLatencies, &largestAddUS, false) == -1) { ERROR_LOG("Incorrect MAX calculation\n"); finished = true; return -1; } u_int largestSubUS; if (getMaxSolution(remoteLatencies, &largestSubUS, true) == -1) { ERROR_LOG("Incorrect MAX calculation\n"); finished = true; return -1; } if (getAvgSolution(remoteLatencies, &averageLatUS) == -1) { ERROR_LOG("Incorrect AVG calculation\n"); finished = true; return -1; } WARN_LOG_1("Largest latency is %d\n", largestAddUS); double betaRatio = ((double)betaNumer) / ((double)betaDenom); WARN_LOG_1("Beta ratio is %0.2f\n", betaRatio); if (betaRatio <= 0.0 || betaRatio >= 1.0) { ERROR_LOG("Illegal beta parameter\n"); betaRatio = 0.5; } // Update timeout u_int newTimeoutPeriod = (u_int) ceil( ((2.0 * betaRatio) + 1.0) * (double)largestAddUS); computeTimeout(newTimeoutPeriod, &timeoutTV); // Change to next state stateMachine = HMC_INDIRECT_PING; // Have to worry about 0 latencies for multiconstraint if ((averageLatUS == 0) || (meridProcess->getRings()->fillVector(largestSubUS, largestAddUS, betaRatio, ringMembers) == -1) || (ringMembers.size() == 0)) { // 0, 0 means itself RetResponse retPacket(qid, 0, 0, remoteLatencies); RealPacket* inPacket = new RealPacket(srcNode); if (retPacket.createRealPacket(*inPacket) == -1) { delete inPacket; } else { meridProcess->addOutPacket(inPacket); } finished = true; return 0; } // Send a ReqTCPProbeAverage to each of the ring members set<NodeIdentRendv, ltNodeIdentRendv>::iterator setIt = ringMembers.begin(); for (; setIt != ringMembers.end(); setIt++) {/* NodeIdent rendvIdent; NodeIdentRendv outIdent = {setIt->addr, setIt->port, 0, 0}; if (meridProcess->getRings()->rendvLookup(*setIt, rendvIdent) != -1) { outIdent.portRendv = rendvIdent.port; outIdent.addrRendv = rendvIdent.addr; } ReqProbeGeneric* newQuery = createReqProbe(outIdent, remoteNodes, meridProcess);*/ ReqProbeGeneric* newQuery = createReqProbe(*setIt, remoteNodes, meridProcess); if (meridProcess->getQueryTable()->insertNewQuery( newQuery) == -1) { delete newQuery; } else { newQuery->subscribeLatency(qid); newQuery->init(); } } return 0;}int HandleMCGeneric::handleLatency( const vector<NodeIdentLat>& in_remoteNodes) { // Determine action depending on state switch (stateMachine) { case HMC_INIT: { return 0; // Unexpected, just return 0 } break; case HMC_WAIT_FOR_DIRECT_PING: { if (remoteLatencies.size() == remoteNodes.size()) { return sendReqProbes(); } // Must have at least one entry in vector unless all entries // are already accounted for if (in_remoteNodes.size() != 1) { return -1; } // NOTE: NodeIdentConst is keyed only on the addr and port // so this will still match NodeIdentConst in_remote_const = {in_remoteNodes[0].addr, in_remoteNodes[0].port, 0}; NodeIdent in_remote = {in_remote_const.addr, in_remote_const.port}; u_int latency_us = in_remoteNodes[0].latencyUS; if (remoteNodes.find(in_r
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -