⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 query.cpp

📁 件主要用于帮助计算机爱好者学习蚁群算法时做有关蚁群算法的试验。蚁群算法作为一种优秀的新兴的算法
💻 CPP
📖 第 1 页 / 共 5 页
字号:
/******************************************************************************Meridian prototype distributionCopyright (C) 2005 Bernard WongThis program is free software; you can redistribute it and/ormodify it under the terms of the GNU General Public Licenseas published by the Free Software Foundation; either version 2of the License, or (at your option) any later version.This program is distributed in the hope that it will be useful,but WITHOUT ANY WARRANTY; without even the implied warranty ofMERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See theGNU General Public License for more details.You should have received a copy of the GNU General Public Licensealong with this program; if not, write to the Free SoftwareFoundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.The copyright owner can be contacted by e-mail at bwong@cs.cornell.edu*******************************************************************************/using namespace std;#include <sys/socket.h>#include <netinet/in.h>#include <arpa/inet.h>#include <math.h>#include <ucontext.h>#include "Marshal.h"#include "Query.h"#include "RingSet.h"#include "MeridianProcess.h"#include "GramSchmidtOpt.h"AddNodeQuery::AddNodeQuery(const NodeIdentRendv& in_remote, 							MeridianProcess* in_process) 		: remoteNode(in_remote), finished(false), meridProcess(in_process) {		qid = meridProcess->getNewQueryID();	computeTimeout(MAX_RTT_MS * MICRO_IN_MILLI, &timeoutTV);		}int AddNodeQuery::handleLatency(		const vector<NodeIdentLat>& in_remoteNodes) {	//	There must be the srcNode as the first entry, and the target node	//	(itself) as the second entry	if (in_remoteNodes.size() != 2) {		ERROR_LOG("Received RET_PING, but not correct size\n");		return -1;			}	if (in_remoteNodes[0].addr != remoteNode.addr ||		in_remoteNodes[0].port != remoteNode.port) {		ERROR_LOG("Received packet from unexpected node\n");		return -1;	}		u_int latencyUS = in_remoteNodes[1].latencyUS;	NodeIdent mainRemoteNode = {remoteNode.addr, remoteNode.port};	NodeIdent rendvRemoteNode = {remoteNode.addrRendv, remoteNode.portRendv};		meridProcess->getRings()->insertNode(		mainRemoteNode, latencyUS, rendvRemoteNode);		NodeIdentLat outNIL = {remoteNode.addr, remoteNode.port, latencyUS};	vector<NodeIdentLat> subVect;	subVect.push_back(outNIL);	for (u_int i = 0; i < subscribers.size(); i++) {		meridProcess->getQueryTable()->notifyQLatency(subscribers[i], subVect);		}		WARN_LOG("Add node query handled successfully\n");		//	Adding to cache	NodeIdent curIdent = {remoteNode.addr, remoteNode.port};	meridProcess->pingCacheInsert(curIdent, latencyUS);			finished = true;	return 0;}int AddNodeQuery::handleEvent(		const NodeIdent& in_remote, const char* inPacket, int packetSize) {	if (inPacket[0] != PONG) {		ERROR_LOG("Expecting PONG packet, received something else\n");		return -1;	// Not pong packet	}	if (in_remote.addr != remoteNode.addr || 		in_remote.port != remoteNode.port) {		ERROR_LOG("Received packet from unexpected node\n");		return -1;	}	struct timeval curTime;			gettimeofday(&curTime, NULL);	u_int latencyUS = (curTime.tv_sec - startTime.tv_sec) * MICRO_IN_SECOND 		+ curTime.tv_usec - startTime.tv_usec;	NodeIdent mainRemoteNode = {remoteNode.addr, remoteNode.port};	NodeIdent rendvRemoteNode = {remoteNode.addrRendv, remoteNode.portRendv};		meridProcess->getRings()->insertNode(		mainRemoteNode, latencyUS, rendvRemoteNode);				//meridProcess->getRings()->insertNode(remoteNode, latencyUS);		NodeIdentLat outNIL = {remoteNode.addr, remoteNode.port, latencyUS};	vector<NodeIdentLat> subVect;	subVect.push_back(outNIL);	for (u_int i = 0; i < subscribers.size(); i++) {		meridProcess->getQueryTable()->notifyQLatency(subscribers[i], subVect);		}	//	Adding to cache	NodeIdent curIdent = {remoteNode.addr, remoteNode.port};	meridProcess->pingCacheInsert(curIdent, latencyUS);			finished = true;	return 0;}int AddNodeQuery::handleTimeout() {	WARN_LOG("######################### QUERY TIMEOUT ###################\n");	NodeIdent tmpIdent = {remoteNode.addr, remoteNode.port};		meridProcess->getRings()->eraseNode(tmpIdent);			finished = true;	return 0;}int AddNodeQuery::init() {	gettimeofday(&startTime, NULL);		//	Firewall support for AddNodeQuery is special. If target is behind	//	a firewall, instead of sending a pushed packet, we perform a req-ping	NodeIdent rendvInfo = meridProcess->returnRendv();		//	If the target is not behind a firewall		if (remoteNode.addrRendv == 0 && remoteNode.portRendv == 0) {		PingPacket pingPacket(qid);		RealPacket* inPacket = new RealPacket(remoteNode);		if (pingPacket.createRealPacket(*inPacket) == -1) {			delete inPacket;						return -1;		}		meridProcess->addOutPacket(inPacket);	} else {		// Target is behind a firewall		if (rendvInfo.addr == 0 && rendvInfo.port == 0) {			// But we're not, so perform a ReqProbe			NodeIdent emptyIdent = {0, 0};			set<NodeIdent, ltNodeIdent> tmpSet;						tmpSet.insert(emptyIdent);			ReqProbePing* newQuery =				new ReqProbePing(remoteNode, tmpSet, meridProcess); 			if (meridProcess->getQueryTable()->insertNewQuery(newQuery) == -1) {							delete newQuery;			} else {				newQuery->subscribeLatency(qid);				newQuery->init();			}		} 		// else we're both behind firewall, we give up and wait for timeout	}	return 0;}int AddNodeQuery::subscribeLatency(uint64_t in_qid) {	subscribers.push_back(in_qid);	return 0;		}//GossipQuery::GossipQuery(NodeIdent& in_remote,GossipQuery::GossipQuery(NodeIdentRendv& in_remote,						MeridianProcess* in_process) 		: 	remoteNode(in_remote), finished(false),			meridProcess(in_process) {	qid = meridProcess->getNewQueryID();	computeTimeout(MAX_RTT_MS * MICRO_IN_MILLI, &timeoutTV);}int GossipQuery::init() {	WARN_LOG("Starting gossip query\n");	gettimeofday(&startTime, NULL);#ifdef DEBUG		u_int netAddr = htonl(remoteNode.addr);	WARN_LOG_2("Sending gossip to node %s:%d\n",		 	inet_ntoa(*(struct in_addr*)&netAddr), remoteNode.port);#endif	AddNodeQuery* newQuery = new AddNodeQuery(remoteNode, meridProcess);	if (meridProcess->getQueryTable()->insertNewQuery(newQuery) == -1) {					delete newQuery;		return -1;	}	newQuery->subscribeLatency(qid);	newQuery->init();	return 0;}int GossipQuery::handleTimeout() {	finished = true;	return 0;}int GossipQuery::fillGossipPacket(GossipPacketGeneric& in_packet, 		const NodeIdentRendv& in_target, MeridianProcess* in_merid) {	vector<NodeIdentRendv> randomNodes;	in_merid->getRings()->getRandomNodes(randomNodes);	// Return okay even if the gossip packet itself is empty	for (u_int i = 0; i < randomNodes.size(); i++) {		NodeIdentRendv curR = randomNodes[i];		// Don't send remote node itself		if (curR.addr != in_target.addr || curR.port != in_target.port) {			in_packet.addNode(				curR.addr, curR.port, curR.addrRendv, curR.portRendv);		}			}	return 0;}int GossipQuery::handleLatency(		const vector<NodeIdentLat>& in_remoteNodes) {//		const map<NodeIdent, u_int, ltNodeIdent>& in_remoteNodes) {//		const NodeIdent& in_remote, u_int latency_us) {	if (in_remoteNodes.size() != 1) {		return -1;			}		NodeIdent in_remote = {in_remoteNodes[0].addr, in_remoteNodes[0].port};	// u_int latency_us = tmpMapIt->second; 	//	Check to see whether it is the expected addr	if (in_remote.addr != remoteNode.addr || 		in_remote.port != remoteNode.port) {		ERROR_LOG("Received packet from unexpected node\n");		return -1;	}				//	Now send a gossip packet	NodeIdent tmpRendvNode = meridProcess->returnRendv();	GossipPacketPush gPacket(qid, tmpRendvNode.addr, tmpRendvNode.port);/*		vector<NodeIdentRendv> randomNodes;	meridProcess->getRings()->getRandomNodes(randomNodes);		if (randomNodes.size() > 0 ) {		for (u_int i = 0; i < randomNodes.size(); i++) {			NodeIdentRendv curR = randomNodes[i];			// Don't send remote node itself			if (curR.addr != remoteNode.addr || curR.port != remoteNode.port) {				gPacket.addNode(					curR.addr, curR.port, curR.addrRendv, curR.portRendv);			}				}*/	if (fillGossipPacket(gPacket, remoteNode, meridProcess) == 0) {		WARN_LOG("Creating gossip packet ###############\n");		RealPacket* inPacket = new RealPacket(remoteNode);		if (gPacket.createRealPacket(*inPacket) == -1) {			delete inPacket;			} else {			meridProcess->addOutPacket(inPacket);		}	}	finished = true;	return 0;	}void QueryScheduler::computeSchedTimeout() {	if (numInitInterval > 0) {		computeTimeout(initInterval_MS * MICRO_IN_MILLI, &timeoutTV);		numInitInterval--;	} else {		computeTimeout(ssInterval_MS * MICRO_IN_MILLI, &timeoutTV);	}		}	QueryScheduler::QueryScheduler(u_int in_initInterval_MS, 			u_int in_numInitInterval, u_int in_ssInterval_MS, 			MeridianProcess* in_process, SchedObject* in_schedObj)		:	schedObj(in_schedObj), initInterval_MS(in_initInterval_MS), 			numInitInterval(in_numInitInterval), 			ssInterval_MS(in_ssInterval_MS), meridProcess(in_process),			finished(false) {	qid = meridProcess->getNewQueryID();	computeSchedTimeout();}int QueryScheduler::handleTimeout() {	WARN_LOG("QueryScheduler activated\n");	schedObj->runOnce();	computeSchedTimeout();	return 0;	}int QueryScheduler::removeScheduler() 	{ 	finished = true;	meridProcess->getQueryTable()->updateTimeout(this);	return 0;}int SchedGossip::runOnce() {	meridProcess->performGossip();	return 0;	}int SchedRingManage::runOnce() {	meridProcess->performRingManagement();	return 0;	}RingManageQuery::RingManageQuery(int in_ringNum, MeridianProcess* in_process) 		: 	ringNum(in_ringNum), finished(false), meridProcess(in_process) {	qid = meridProcess->getNewQueryID();		computeTimeout(2 * MAX_RTT_MS * MICRO_IN_MILLI, &timeoutTV);	}int RingManageQuery::init() {	NodeIdent dummy;	gettimeofday(&startTime, NULL);	if (ringNum < 0 || 		(ringNum >= meridProcess->getRings()->getNumberOfRings())) {		return -1;			}	meridProcess->getRings()->membersDump(ringNum, remoteNodes);		meridProcess->getRings()->freezeRing(ringNum);		//	Create Req packets to send to each one	//	Only send packets to nodes that are not behind firewalls	set<NodeIdent, ltNodeIdent>::iterator outerIt = remoteNodes.begin();	for (; outerIt != remoteNodes.end(); outerIt++) {						if (meridProcess->getRings()->rendvLookup(*outerIt, dummy) != -1) {						continue; // Requires rendavous, don't add		}		NodeIdent tmpRendvNode = meridProcess->returnRendv();				ReqMeasurePing req(qid, tmpRendvNode.addr, tmpRendvNode.port);		set<NodeIdent, ltNodeIdent>::iterator innerIt = remoteNodes.begin();		for (; innerIt != remoteNodes.end(); innerIt++) {			if (((outerIt->addr == innerIt->addr) &&				(outerIt->port == innerIt->port)) || 				(meridProcess->getRings()->rendvLookup(*innerIt, dummy) != -1)){				continue;			}			req.addTarget(*innerIt);		}		RealPacket* inPacket = new RealPacket(*outerIt);		if (req.createRealPacket(*inPacket) == -1) {			delete inPacket;						continue;		}		meridProcess->addOutPacket(inPacket);							}	return 0;}int RingManageQuery::handleEvent(		const NodeIdent& in_remote, const char* inPacket, int packetSize) {	if (inPacket[0] != RET_PING_REQ) {		ERROR_LOG("Expecting RET_PING_REQ, received somthing else\n");		return -1;	// Not RET_PING_REQ packet	}	if (remoteNodes.find(in_remote) == remoteNodes.end()) {		ERROR_LOG("Received packet from unexpected node\n");		return -1;	} else {		if (RetNodeMap.find(in_remote) != RetNodeMap.end()) {			ERROR_LOG("Node already reported sent a RET_PING_REQ\n");			return -1;							}	}	RetPing* ret = RetPing::parse(inPacket, packetSize);	if (ret == NULL) {		ERROR_LOG("RET_PING_REQ Ill-formed\n");		return -1;	}	map<NodeIdent, u_int, ltNodeIdent>* newMap 		= new map<NodeIdent, u_int, ltNodeIdent>();		const vector<NodeIdentLat>* retNodes = ret->returnNodes();	for (u_int i = 0; i < retNodes->size(); i++) {		NodeIdent tmp = {(*retNodes)[i].addr, (*retNodes)[i].port};		if (remoteNodes.find(tmp) != remoteNodes.end()) {			(*newMap)[tmp] = (*retNodes)[i].latencyUS;				}					}	RetNodeMap[in_remote] = newMap;	WARN_LOG_2("remoteNodes has %d entries, RetNodeMap has %d entries\n",		remoteNodes.size(), RetNodeMap.size()); 	if (RetNodeMap.size() == remoteNodes.size()) {		WARN_LOG("^^^^^^^^^^^^Receive all RET_PING^^^^^^^^^^^^^^^^\n");		// Done, perform ring management		meridProcess->getRings()->unfreezeRing(ringNum);				performReplacement();		finished = true;	}	delete ret;	// Done with packet	return 0;}int RingManageQuery::removeCandidateNode(const NodeIdent& in_node) {	//	Remove in_node from both RetNodeMap and remoteNodes	map<NodeIdent, map<NodeIdent, u_int, ltNodeIdent>*, ltNodeIdent>::		iterator retNodeIt = RetNodeMap.find(in_node);	if (retNodeIt != RetNodeMap.end()) {		delete retNodeIt->second;		RetNodeMap.erase(retNodeIt);	}		remoteNodes.erase(in_node);			//	Iterate through all entries and remove in_node from it	retNodeIt = RetNodeMap.begin();	for (; retNodeIt != RetNodeMap.end(); retNodeIt++) {		//	Erase all bade nodes from map		(retNodeIt->second)->erase(in_node);				}	return 0;}int RingManageQuery::handleTimeout() {	meridProcess->getRings()->unfreezeRing(ringNum);		set<NodeIdent, ltNodeIdent>::iterator it = remoteNodes.begin();	vector<NodeIdent> badNodes;	for (; it != remoteNodes.end(); it++) {		if (RetNodeMap.find(*it) == RetNodeMap.end()) {			// Did not receive response back from node, delete it			WARN_LOG("############## REQ_PING TIMEOUT ###############\n");

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -