📄 query.cpp
字号:
/******************************************************************************Meridian prototype distributionCopyright (C) 2005 Bernard WongThis program is free software; you can redistribute it and/ormodify it under the terms of the GNU General Public Licenseas published by the Free Software Foundation; either version 2of the License, or (at your option) any later version.This program is distributed in the hope that it will be useful,but WITHOUT ANY WARRANTY; without even the implied warranty ofMERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See theGNU General Public License for more details.You should have received a copy of the GNU General Public Licensealong with this program; if not, write to the Free SoftwareFoundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.The copyright owner can be contacted by e-mail at bwong@cs.cornell.edu*******************************************************************************/using namespace std;#include <sys/socket.h>#include <netinet/in.h>#include <arpa/inet.h>#include <math.h>#include <ucontext.h>#include "Marshal.h"#include "Query.h"#include "RingSet.h"#include "MeridianProcess.h"#include "GramSchmidtOpt.h"AddNodeQuery::AddNodeQuery(const NodeIdentRendv& in_remote, MeridianProcess* in_process) : remoteNode(in_remote), finished(false), meridProcess(in_process) { qid = meridProcess->getNewQueryID(); computeTimeout(MAX_RTT_MS * MICRO_IN_MILLI, &timeoutTV); }int AddNodeQuery::handleLatency( const vector<NodeIdentLat>& in_remoteNodes) { // There must be the srcNode as the first entry, and the target node // (itself) as the second entry if (in_remoteNodes.size() != 2) { ERROR_LOG("Received RET_PING, but not correct size\n"); return -1; } if (in_remoteNodes[0].addr != remoteNode.addr || in_remoteNodes[0].port != remoteNode.port) { ERROR_LOG("Received packet from unexpected node\n"); return -1; } u_int latencyUS = in_remoteNodes[1].latencyUS; NodeIdent mainRemoteNode = {remoteNode.addr, remoteNode.port}; NodeIdent rendvRemoteNode = {remoteNode.addrRendv, remoteNode.portRendv}; meridProcess->getRings()->insertNode( mainRemoteNode, latencyUS, rendvRemoteNode); NodeIdentLat outNIL = {remoteNode.addr, remoteNode.port, latencyUS}; vector<NodeIdentLat> subVect; subVect.push_back(outNIL); for (u_int i = 0; i < subscribers.size(); i++) { meridProcess->getQueryTable()->notifyQLatency(subscribers[i], subVect); } WARN_LOG("Add node query handled successfully\n"); // Adding to cache NodeIdent curIdent = {remoteNode.addr, remoteNode.port}; meridProcess->pingCacheInsert(curIdent, latencyUS); finished = true; return 0;}int AddNodeQuery::handleEvent( const NodeIdent& in_remote, const char* inPacket, int packetSize) { if (inPacket[0] != PONG) { ERROR_LOG("Expecting PONG packet, received something else\n"); return -1; // Not pong packet } if (in_remote.addr != remoteNode.addr || in_remote.port != remoteNode.port) { ERROR_LOG("Received packet from unexpected node\n"); return -1; } struct timeval curTime; gettimeofday(&curTime, NULL); u_int latencyUS = (curTime.tv_sec - startTime.tv_sec) * MICRO_IN_SECOND + curTime.tv_usec - startTime.tv_usec; NodeIdent mainRemoteNode = {remoteNode.addr, remoteNode.port}; NodeIdent rendvRemoteNode = {remoteNode.addrRendv, remoteNode.portRendv}; meridProcess->getRings()->insertNode( mainRemoteNode, latencyUS, rendvRemoteNode); //meridProcess->getRings()->insertNode(remoteNode, latencyUS); NodeIdentLat outNIL = {remoteNode.addr, remoteNode.port, latencyUS}; vector<NodeIdentLat> subVect; subVect.push_back(outNIL); for (u_int i = 0; i < subscribers.size(); i++) { meridProcess->getQueryTable()->notifyQLatency(subscribers[i], subVect); } // Adding to cache NodeIdent curIdent = {remoteNode.addr, remoteNode.port}; meridProcess->pingCacheInsert(curIdent, latencyUS); finished = true; return 0;}int AddNodeQuery::handleTimeout() { WARN_LOG("######################### QUERY TIMEOUT ###################\n"); NodeIdent tmpIdent = {remoteNode.addr, remoteNode.port}; meridProcess->getRings()->eraseNode(tmpIdent); finished = true; return 0;}int AddNodeQuery::init() { gettimeofday(&startTime, NULL); // Firewall support for AddNodeQuery is special. If target is behind // a firewall, instead of sending a pushed packet, we perform a req-ping NodeIdent rendvInfo = meridProcess->returnRendv(); // If the target is not behind a firewall if (remoteNode.addrRendv == 0 && remoteNode.portRendv == 0) { PingPacket pingPacket(qid); RealPacket* inPacket = new RealPacket(remoteNode); if (pingPacket.createRealPacket(*inPacket) == -1) { delete inPacket; return -1; } meridProcess->addOutPacket(inPacket); } else { // Target is behind a firewall if (rendvInfo.addr == 0 && rendvInfo.port == 0) { // But we're not, so perform a ReqProbe NodeIdent emptyIdent = {0, 0}; set<NodeIdent, ltNodeIdent> tmpSet; tmpSet.insert(emptyIdent); ReqProbePing* newQuery = new ReqProbePing(remoteNode, tmpSet, meridProcess); if (meridProcess->getQueryTable()->insertNewQuery(newQuery) == -1) { delete newQuery; } else { newQuery->subscribeLatency(qid); newQuery->init(); } } // else we're both behind firewall, we give up and wait for timeout } return 0;}int AddNodeQuery::subscribeLatency(uint64_t in_qid) { subscribers.push_back(in_qid); return 0; }//GossipQuery::GossipQuery(NodeIdent& in_remote,GossipQuery::GossipQuery(NodeIdentRendv& in_remote, MeridianProcess* in_process) : remoteNode(in_remote), finished(false), meridProcess(in_process) { qid = meridProcess->getNewQueryID(); computeTimeout(MAX_RTT_MS * MICRO_IN_MILLI, &timeoutTV);}int GossipQuery::init() { WARN_LOG("Starting gossip query\n"); gettimeofday(&startTime, NULL);#ifdef DEBUG u_int netAddr = htonl(remoteNode.addr); WARN_LOG_2("Sending gossip to node %s:%d\n", inet_ntoa(*(struct in_addr*)&netAddr), remoteNode.port);#endif AddNodeQuery* newQuery = new AddNodeQuery(remoteNode, meridProcess); if (meridProcess->getQueryTable()->insertNewQuery(newQuery) == -1) { delete newQuery; return -1; } newQuery->subscribeLatency(qid); newQuery->init(); return 0;}int GossipQuery::handleTimeout() { finished = true; return 0;}int GossipQuery::fillGossipPacket(GossipPacketGeneric& in_packet, const NodeIdentRendv& in_target, MeridianProcess* in_merid) { vector<NodeIdentRendv> randomNodes; in_merid->getRings()->getRandomNodes(randomNodes); // Return okay even if the gossip packet itself is empty for (u_int i = 0; i < randomNodes.size(); i++) { NodeIdentRendv curR = randomNodes[i]; // Don't send remote node itself if (curR.addr != in_target.addr || curR.port != in_target.port) { in_packet.addNode( curR.addr, curR.port, curR.addrRendv, curR.portRendv); } } return 0;}int GossipQuery::handleLatency( const vector<NodeIdentLat>& in_remoteNodes) {// const map<NodeIdent, u_int, ltNodeIdent>& in_remoteNodes) {// const NodeIdent& in_remote, u_int latency_us) { if (in_remoteNodes.size() != 1) { return -1; } NodeIdent in_remote = {in_remoteNodes[0].addr, in_remoteNodes[0].port}; // u_int latency_us = tmpMapIt->second; // Check to see whether it is the expected addr if (in_remote.addr != remoteNode.addr || in_remote.port != remoteNode.port) { ERROR_LOG("Received packet from unexpected node\n"); return -1; } // Now send a gossip packet NodeIdent tmpRendvNode = meridProcess->returnRendv(); GossipPacketPush gPacket(qid, tmpRendvNode.addr, tmpRendvNode.port);/* vector<NodeIdentRendv> randomNodes; meridProcess->getRings()->getRandomNodes(randomNodes); if (randomNodes.size() > 0 ) { for (u_int i = 0; i < randomNodes.size(); i++) { NodeIdentRendv curR = randomNodes[i]; // Don't send remote node itself if (curR.addr != remoteNode.addr || curR.port != remoteNode.port) { gPacket.addNode( curR.addr, curR.port, curR.addrRendv, curR.portRendv); } }*/ if (fillGossipPacket(gPacket, remoteNode, meridProcess) == 0) { WARN_LOG("Creating gossip packet ###############\n"); RealPacket* inPacket = new RealPacket(remoteNode); if (gPacket.createRealPacket(*inPacket) == -1) { delete inPacket; } else { meridProcess->addOutPacket(inPacket); } } finished = true; return 0; }void QueryScheduler::computeSchedTimeout() { if (numInitInterval > 0) { computeTimeout(initInterval_MS * MICRO_IN_MILLI, &timeoutTV); numInitInterval--; } else { computeTimeout(ssInterval_MS * MICRO_IN_MILLI, &timeoutTV); } } QueryScheduler::QueryScheduler(u_int in_initInterval_MS, u_int in_numInitInterval, u_int in_ssInterval_MS, MeridianProcess* in_process, SchedObject* in_schedObj) : schedObj(in_schedObj), initInterval_MS(in_initInterval_MS), numInitInterval(in_numInitInterval), ssInterval_MS(in_ssInterval_MS), meridProcess(in_process), finished(false) { qid = meridProcess->getNewQueryID(); computeSchedTimeout();}int QueryScheduler::handleTimeout() { WARN_LOG("QueryScheduler activated\n"); schedObj->runOnce(); computeSchedTimeout(); return 0; }int QueryScheduler::removeScheduler() { finished = true; meridProcess->getQueryTable()->updateTimeout(this); return 0;}int SchedGossip::runOnce() { meridProcess->performGossip(); return 0; }int SchedRingManage::runOnce() { meridProcess->performRingManagement(); return 0; }RingManageQuery::RingManageQuery(int in_ringNum, MeridianProcess* in_process) : ringNum(in_ringNum), finished(false), meridProcess(in_process) { qid = meridProcess->getNewQueryID(); computeTimeout(2 * MAX_RTT_MS * MICRO_IN_MILLI, &timeoutTV); }int RingManageQuery::init() { NodeIdent dummy; gettimeofday(&startTime, NULL); if (ringNum < 0 || (ringNum >= meridProcess->getRings()->getNumberOfRings())) { return -1; } meridProcess->getRings()->membersDump(ringNum, remoteNodes); meridProcess->getRings()->freezeRing(ringNum); // Create Req packets to send to each one // Only send packets to nodes that are not behind firewalls set<NodeIdent, ltNodeIdent>::iterator outerIt = remoteNodes.begin(); for (; outerIt != remoteNodes.end(); outerIt++) { if (meridProcess->getRings()->rendvLookup(*outerIt, dummy) != -1) { continue; // Requires rendavous, don't add } NodeIdent tmpRendvNode = meridProcess->returnRendv(); ReqMeasurePing req(qid, tmpRendvNode.addr, tmpRendvNode.port); set<NodeIdent, ltNodeIdent>::iterator innerIt = remoteNodes.begin(); for (; innerIt != remoteNodes.end(); innerIt++) { if (((outerIt->addr == innerIt->addr) && (outerIt->port == innerIt->port)) || (meridProcess->getRings()->rendvLookup(*innerIt, dummy) != -1)){ continue; } req.addTarget(*innerIt); } RealPacket* inPacket = new RealPacket(*outerIt); if (req.createRealPacket(*inPacket) == -1) { delete inPacket; continue; } meridProcess->addOutPacket(inPacket); } return 0;}int RingManageQuery::handleEvent( const NodeIdent& in_remote, const char* inPacket, int packetSize) { if (inPacket[0] != RET_PING_REQ) { ERROR_LOG("Expecting RET_PING_REQ, received somthing else\n"); return -1; // Not RET_PING_REQ packet } if (remoteNodes.find(in_remote) == remoteNodes.end()) { ERROR_LOG("Received packet from unexpected node\n"); return -1; } else { if (RetNodeMap.find(in_remote) != RetNodeMap.end()) { ERROR_LOG("Node already reported sent a RET_PING_REQ\n"); return -1; } } RetPing* ret = RetPing::parse(inPacket, packetSize); if (ret == NULL) { ERROR_LOG("RET_PING_REQ Ill-formed\n"); return -1; } map<NodeIdent, u_int, ltNodeIdent>* newMap = new map<NodeIdent, u_int, ltNodeIdent>(); const vector<NodeIdentLat>* retNodes = ret->returnNodes(); for (u_int i = 0; i < retNodes->size(); i++) { NodeIdent tmp = {(*retNodes)[i].addr, (*retNodes)[i].port}; if (remoteNodes.find(tmp) != remoteNodes.end()) { (*newMap)[tmp] = (*retNodes)[i].latencyUS; } } RetNodeMap[in_remote] = newMap; WARN_LOG_2("remoteNodes has %d entries, RetNodeMap has %d entries\n", remoteNodes.size(), RetNodeMap.size()); if (RetNodeMap.size() == remoteNodes.size()) { WARN_LOG("^^^^^^^^^^^^Receive all RET_PING^^^^^^^^^^^^^^^^\n"); // Done, perform ring management meridProcess->getRings()->unfreezeRing(ringNum); performReplacement(); finished = true; } delete ret; // Done with packet return 0;}int RingManageQuery::removeCandidateNode(const NodeIdent& in_node) { // Remove in_node from both RetNodeMap and remoteNodes map<NodeIdent, map<NodeIdent, u_int, ltNodeIdent>*, ltNodeIdent>:: iterator retNodeIt = RetNodeMap.find(in_node); if (retNodeIt != RetNodeMap.end()) { delete retNodeIt->second; RetNodeMap.erase(retNodeIt); } remoteNodes.erase(in_node); // Iterate through all entries and remove in_node from it retNodeIt = RetNodeMap.begin(); for (; retNodeIt != RetNodeMap.end(); retNodeIt++) { // Erase all bade nodes from map (retNodeIt->second)->erase(in_node); } return 0;}int RingManageQuery::handleTimeout() { meridProcess->getRings()->unfreezeRing(ringNum); set<NodeIdent, ltNodeIdent>::iterator it = remoteNodes.begin(); vector<NodeIdent> badNodes; for (; it != remoteNodes.end(); it++) { if (RetNodeMap.find(*it) == RetNodeMap.end()) { // Did not receive response back from node, delete it WARN_LOG("############## REQ_PING TIMEOUT ###############\n");
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -