📄 query.cpp
字号:
curIdent.port = srcNode.port; }*/ uint32_t curLatencyUS; if (getLatency(curIdent, &curLatencyUS) == -1) { ProbeQueryGeneric* newQuery = createProbeQuery(curIdent, getMerid()); if (getMerid()->getQueryTable()->insertNewQuery(newQuery) == -1) { delete newQuery; continue; } newQuery->subscribeLatency(getQueryID()); newQuery->init(); } else { remoteLatencies[curIdent] = curLatencyUS; } } if (remoteLatencies.size() == remoteNodes.size()) { // It's done, tell the query it is vector<NodeIdentLat> dummy; getMerid()->getQueryTable()->notifyQLatency(getQueryID(), dummy); } return 0; }int HandleReqTCP::getLatency(const NodeIdent& inNode, uint32_t* latencyUS) { return getMerid()->tcpCacheGetLatency(inNode, latencyUS);}int HandleReqDNS::getLatency(const NodeIdent& inNode, uint32_t* latencyUS) { return getMerid()->dnsCacheGetLatency(inNode, latencyUS);}int HandleReqPing::getLatency(const NodeIdent& inNode, uint32_t* latencyUS) { return getMerid()->pingCacheGetLatency(inNode, latencyUS);}#ifdef PLANET_LAB_SUPPORTint HandleReqICMP::getLatency(const NodeIdent& inNode, uint32_t* latencyUS) { return getMerid()->icmpCacheGetLatency(inNode, latencyUS);}#endifProbeQueryGeneric::ProbeQueryGeneric(const NodeIdent& in_remote, MeridianProcess* in_process) : sockFD(-1), remoteNode(in_remote), finished(false), meridProcess(in_process) { qid = meridProcess->getNewQueryID(); computeTimeout(MAX_RTT_MS * MICRO_IN_MILLI, &timeoutTV); }void ProbeQueryTCP::insertCache(const NodeIdent& inNode, uint32_t latencyUS) { getMerid()->tcpCacheInsert(inNode, latencyUS);}int ProbeQueryTCP::init() { setStartTime(); WARN_LOG("ProbeQueryTCP: Adding new TCP connection\n"); setSockFD(getMerid()->addTCPConnection(getQueryID(), getRemoteNode())); return 0;}int ProbeQueryTCP::handleTimeout() { WARN_LOG("##################### TCP QUERY TIMEOUT ###################\n"); int tmpFD = getSockFD(); if (tmpFD != -1) { getMerid()->eraseTCPConnection(tmpFD); } setFinished(true); return 0;}void ProbeQueryDNS::insertCache(const NodeIdent& inNode, uint32_t latencyUS) { getMerid()->dnsCacheInsert(inNode, latencyUS);} int ProbeQueryDNS::init() { setStartTime(); WARN_LOG("ProbeQueryDNS: Adding new DNS connection\n"); setSockFD(getMerid()->addDNSConnection(getQueryID(), getRemoteNode())); return 0;}int ProbeQueryDNS::handleTimeout() { WARN_LOG("##################### DNS QUERY TIMEOUT ###################\n"); WARN_LOG("Erasing old DNS connections\n"); int tmpFD = getSockFD(); if (tmpFD != -1) { getMerid()->eraseDNSConnection(tmpFD); } setFinished(true); return 0;}void ProbeQueryPing::insertCache(const NodeIdent& inNode, uint32_t latencyUS) { getMerid()->pingCacheInsert(inNode, latencyUS);} int ProbeQueryPing::init() { setStartTime(); PingPacket pingPacket(getQueryID()); RealPacket* inPacket = new RealPacket(getRemoteNode()); if (pingPacket.createRealPacket(*inPacket) == -1) { delete inPacket; return -1; } WARN_LOG("ProbeQueryPing: Sending Ping packet\n"); getMerid()->addOutPacket(inPacket); return 0;}int ProbeQueryPing::handleTimeout() { WARN_LOG("##################### PING QUERY TIMEOUT ###################\n"); setFinished(true); return 0;}#ifdef PLANET_LAB_SUPPORTvoid ProbeQueryICMP::insertCache(const NodeIdent& inNode, uint32_t latencyUS) { getMerid()->icmpCacheInsert(inNode, latencyUS);}int ProbeQueryICMP::init() { setStartTime(); WARN_LOG("ProbeQueryICMP: Sending ICMP ECHO packet\n"); return getMerid()->sendICMPProbe(getQueryID(), getRemoteNode().addr); }int ProbeQueryICMP::handleTimeout() { WARN_LOG("##################### ICMP QUERY TIMEOUT ###################\n"); setFinished(true); return 0;}#endifint ProbeQueryGeneric::subscribeLatency(uint64_t in_qid) { subscribers.push_back(in_qid); return 0; }int ProbeQueryGeneric::handleEvent( const NodeIdent& in_remote, const char* inPacket, int packetSize) { // Just push down an empty latency event instead vector<NodeIdentLat> tmpVect; NodeIdentLat tmpIdent = {in_remote.addr, in_remote.port, 0}; tmpVect.push_back(tmpIdent); return handleLatency(tmpVect); }// If we receive an handleLatency, that means the connect completed// correctly and we don't need to perform an explict eraseTCPConnectionint ProbeQueryGeneric::handleLatency( const vector<NodeIdentLat>& in_remoteNodes) { if (in_remoteNodes.size() != 1) { return -1; } NodeIdent in_remote = {in_remoteNodes[0].addr, in_remoteNodes[0].port}; // Note: latency_us is always 0, it's actually measured within the query //if (in_remote.addr != remoteNode.addr || // in_remote.port != remoteNode.port) { // Temporary hack, don't check port to support ICMP if (in_remote.addr != remoteNode.addr) { ERROR_LOG("Received packet from unexpected node\n"); return -1; } struct timeval curTime; gettimeofday(&curTime, NULL); u_int realLatencyUS = (curTime.tv_sec - startTime.tv_sec) * MICRO_IN_SECOND + curTime.tv_usec - startTime.tv_usec; NodeIdentLat outNIL = {remoteNode.addr, remoteNode.port, realLatencyUS}; vector<NodeIdentLat> subVect; subVect.push_back(outNIL); for (u_int i = 0; i < subscribers.size(); i++) { meridProcess->getQueryTable()->notifyQLatency(subscribers[i], subVect); } // Add to cache entry for future use insertCache(remoteNode, realLatencyUS); finished = true; return 0;}HandleClosestGeneric::HandleClosestGeneric(uint64_t id, u_short in_betaNumer, u_short in_betaDenom, const NodeIdentRendv& in_srcNode, const vector<NodeIdent>& in_remote, MeridianProcess* in_process) : qid(id), betaNumer(in_betaNumer), betaDenom(in_betaDenom), srcNode(in_srcNode), finished(false), meridProcess(in_process) { selectedMember.addr = 0; selectedMember.port = 0; computeTimeout(MAX_RTT_MS * MICRO_IN_MILLI, &timeoutTV); // Copy all targets over for (u_int i = 0; i < in_remote.size(); i++) { remoteNodes.insert(in_remote[i]); } stateMachine = HC_INIT;}int HandleClosestGeneric::init() { //gettimeofday(&startTime, NULL); set<NodeIdent, ltNodeIdent>::iterator it = remoteNodes.begin(); for (; it != remoteNodes.end(); it++) { uint32_t curLatencyUS; if (getLatency(*it, &curLatencyUS) == -1) { ProbeQueryGeneric* newQuery = createProbeQuery(*it, meridProcess); if (meridProcess->getQueryTable()->insertNewQuery(newQuery) == -1) { delete newQuery; continue; } newQuery->subscribeLatency(qid); newQuery->init(); } else { remoteLatencies[*it] = curLatencyUS; } } RetInfo curRetInfo(qid, 0, 0); // Send back an intermediate info packet RealPacket* inPacket = new RealPacket(srcNode); if (curRetInfo.createRealPacket(*inPacket) == -1) { delete inPacket; } else { meridProcess->addOutPacket(inPacket); } stateMachine = HC_WAIT_FOR_DIRECT_PING; if (remoteLatencies.size() == remoteNodes.size()) { // It's done, tell the query it is vector<NodeIdentLat> dummy; getMerid()->getQueryTable()->notifyQLatency(getQueryID(), dummy); } return 0; }int HandleClosestGeneric::handleEvent( const NodeIdent& in_remote, const char* inPacket, int packetSize) { // Forward certain types of packets backwards, // such as RET_RESPONSE, in which case set finished = true if (stateMachine == HC_WAIT_FOR_FIN) { BufferWrapper rb(inPacket, packetSize); char queryType; uint64_t queryID; if (Packet::parseHeader(rb, &queryType, &queryID) == -1) { assert(false); // This should not be possible } if (queryType == getQueryType()) { // Query may have gone in a loop, let just say we are the closest WARN_LOG("WARNING: Query may have gone in a loop\n"); RetResponse retPacket(qid, 0, 0, remoteLatencies); // Note: Don't need to go through rendavous as we // just received packet from in_remote, there should // be a hole in the NAT for the return RealPacket* inPacket = new RealPacket(in_remote); if (retPacket.createRealPacket(*inPacket) == -1) { delete inPacket; } else { meridProcess->addOutPacket(inPacket); } } else if ((in_remote.addr == selectedMember.addr) && (in_remote.port == selectedMember.port)) { WARN_LOG("Received packet from selected ring member\n"); if (queryType == RET_RESPONSE) { RetResponse* retResp = RetResponse::parse(in_remote, inPacket, packetSize); if (retResp == NULL) { ERROR_LOG("Malformed packet received\n"); return -1; } RealPacket* inPacket = new RealPacket(srcNode); if (retResp->createRealPacket(*inPacket) == -1) { delete inPacket; } else { meridProcess->addOutPacket(inPacket); } delete retResp; // Done with RetResponse finished = true; } else if (queryType == RET_ERROR) { RetError* retErr = RetError::parse(inPacket, packetSize); if (retErr == NULL) { ERROR_LOG("Malformed packet received\n"); return -1; } RealPacket* inPacket = new RealPacket(srcNode); if (retErr->createRealPacket(*inPacket) == -1) { delete inPacket; } else { meridProcess->addOutPacket(inPacket); } delete retErr; // Done with RetResponse finished = true; } else if (queryType == RET_INFO) { RetInfo* curRetInfo = RetInfo::parse(in_remote, inPacket, packetSize); if (curRetInfo == NULL) { ERROR_LOG("Malformed packet received\n"); return -1; } RealPacket* inPacket = new RealPacket(srcNode); if (curRetInfo->createRealPacket(*inPacket) == -1) { delete inPacket; } else { meridProcess->addOutPacket(inPacket); } delete curRetInfo; } } } return 0; }int HandleClosestGeneric::getMaxAndMinAndAverage( const map<NodeIdent, u_int, ltNodeIdent>& inMap, u_int* maxValue, u_int* minValue, u_int* avgValue) { if (inMap.size() == 0) { return -1; } else if (inMap.size() == 1) { *maxValue = inMap.begin()->second; *minValue = inMap.begin()->second; *avgValue = inMap.begin()->second; } else { *maxValue = 0; *minValue = UINT_MAX; double totalValue = 0.0; map<NodeIdent, u_int, ltNodeIdent>::const_iterator mapIt = inMap.begin(); for (; mapIt != inMap.end(); mapIt++) { totalValue += mapIt->second; if (mapIt->second > (*maxValue)) { *maxValue = mapIt->second; } if (mapIt->second < (*minValue)) { *minValue = mapIt->second; } } // The following is just being very careful, probably not ncessary long long tmpLL = llround(totalValue / inMap.size()); if (tmpLL > UINT_MAX) { *avgValue = UINT_MAX; } else if (tmpLL <= 0) { *avgValue = 0; } else { *avgValue = (u_int) tmpLL; } } return 0;} int HandleClosestGeneric::getMaxAndAverage( const map<NodeIdent, u_int, ltNodeIdent>& inMap, u_int* maxValue, u_int* avgValue) { if (inMap.size() == 0) { return -1; } else if (inMap.size() == 1) { *maxValue = inMap.begin()->second; *avgValue = inMap.begin()->second; } else { *maxValue = 0; double totalValue = 0.0; map<NodeIdent, u_int, ltNodeIdent>::const_iterator mapIt = inMap.begin(); for (; mapIt != inMap.end(); mapIt++) { totalValue += mapIt->second; if (mapIt->second > (*maxValue)) { *maxValue = mapIt->second; } } // The following is just being very careful, probably not ncessary long long tmpLL = llround(totalValue / inMap.size()); if (tmpLL > UINT_MAX) { *avgValue = UINT_MAX; } else if (tmpLL <= 0) { *avgValue = 0; } else { *avgValue = (u_int) tmpLL; } } return 0; }int HandleClosestGeneric::handleForward() { // We have all the information necessary to make a forwarding decision u_int lowestLatUS = UINT_MAX; NodeIdent closestMember = {0, 0}; // For the lowest latency node by iterating through the ring members // that have returned results back map<NodeIdent, map<NodeIdent, u_int, ltNodeIdent>*, ltNodeIdent>:: iterator it = ringLatencies.begin(); for (; it != ringLatencies.end(); it++) { u_int dummy, curAvgLatency; if (getMaxAndAverage(*(it->second), &dummy, &curAvgLatency) == -1) { ERROR_LOG("Incorrect MAX/AVG calculation\n"); continue; } if (curAvgLatency < lowestLatUS) { lowestLatUS = curAvgLatency; closestMember = it->first; } } double betaRatio = ((double)betaNumer) / ((double)betaDenom); if (betaRatio <= 0.0 || betaRatio >= 1.0) { ERROR_LOG("Illegal beta parameter\n");
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -