⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 query.cpp

📁 件主要用于帮助计算机爱好者学习蚁群算法时做有关蚁群算法的试验。蚁群算法作为一种优秀的新兴的算法
💻 CPP
📖 第 1 页 / 共 5 页
字号:
ReqProbeGeneric::ReqProbeGeneric(const NodeIdentRendv& in_src_node,						const set<NodeIdentConst, ltNodeIdentConst>& in_remote, 						MeridianProcess* in_process)		: 	srcNode(in_src_node), finished(false), meridProcess(in_process) {	qid = meridProcess->getNewQueryID();	computeTimeout(2 * MAX_RTT_MS * MICRO_IN_MILLI, &timeoutTV);	//	Copy all targets over	set<NodeIdentConst, ltNodeIdentConst>::const_iterator it 		= in_remote.begin();	for (; it != in_remote.end(); it++) {		NodeIdent tmp = {it->addr, it->port};		remoteNodes.insert(tmp);			}}int ReqProbeGeneric::handleEvent(		const NodeIdent& in_remote, const char* inPacket, int packetSize) {			if (inPacket[0] != RET_PING_REQ) {		ERROR_LOG("Expecting RET_PING_REQ packet, received something else\n");		return -1;	// Not pong packet	}		if (in_remote.addr != srcNode.addr || 		in_remote.port != srcNode.port) {		ERROR_LOG("Received packet from unexpected node\n");		return -1;	}	RetPing* newRetPing = RetPing::parse(inPacket, packetSize);	if (newRetPing == NULL) {		ERROR_LOG("Incorrect packet received\n");		return -1;	}		const vector<NodeIdentLat>* tmpVectLat = newRetPing->returnNodes();	//	Not all the nodes are there	if (tmpVectLat->size() != remoteNodes.size()) {		ERROR_LOG("Only partial list of nodes returned\n");		delete newRetPing;		return -1;	}	vector<NodeIdentLat> newTmpVect;	//	HACK: Add srcNode to the vector before telling subscriber	NodeIdentLat outNIL = {srcNode.addr, srcNode.port, 0};	newTmpVect.push_back(outNIL);	for (u_int i = 0; i < tmpVectLat->size(); i++) {		newTmpVect.push_back((*tmpVectLat)[i]);		}	//	Tell subscribers	for (u_int i = 0; i < subscribers.size(); i++) {				meridProcess->getQueryTable()->notifyQLatency(			subscribers[i], newTmpVect);		}	delete newRetPing;	// 	Done with packet	finished = true;	//	Done with query	return 0;}int ReqProbeGeneric::handleTimeout() {	NodeIdent tmpIdent = {srcNode.addr, srcNode.port};	meridProcess->getRings()->eraseNode(tmpIdent);	finished = true;	return 0;		}int ReqProbeGeneric::subscribeLatency(uint64_t in_qid) {	subscribers.push_back(in_qid);	return 0;}HandleMCGeneric::HandleMCGeneric(uint64_t id,							u_short in_betaNumer, u_short in_betaDenom,							const NodeIdentRendv& in_srcNode, 							const vector<NodeIdentConst>& in_remote, 							MeridianProcess* in_process)		: 	qid(id), betaNumer(in_betaNumer), betaDenom(in_betaDenom),			srcNode(in_srcNode), finished(false), meridProcess(in_process) {	selectedMember.addr = 0;	selectedMember.port = 0;					computeTimeout(MAX_RTT_MS * MICRO_IN_MILLI, &timeoutTV);	//	Copy all targets over	for (u_int i = 0; i < in_remote.size(); i++) {		remoteNodes.insert(in_remote[i]);				}	stateMachine = HMC_INIT;}int HandleMCGeneric::init() {	//gettimeofday(&startTime, NULL);	set<NodeIdentConst, ltNodeIdentConst>::iterator it = remoteNodes.begin();	for (; it != remoteNodes.end(); it++) {		NodeIdent tmp = {it->addr, it->port};		uint32_t curLatencyUS;		if (getLatency(tmp, &curLatencyUS) == -1) {					ProbeQueryGeneric* newQuery =				createProbeQuery(tmp, meridProcess);			if (meridProcess->getQueryTable()->insertNewQuery(newQuery) == -1) {							delete newQuery;				continue;			}			newQuery->subscribeLatency(qid);			newQuery->init();		} else {			remoteLatencies[tmp] = curLatencyUS;				}	}	RetInfo curRetInfo(qid, 0, 0);	//	Send back an intermediate info packet	RealPacket* inPacket = new RealPacket(srcNode);	if (curRetInfo.createRealPacket(*inPacket) == -1) {		delete inPacket;				} else {		meridProcess->addOutPacket(inPacket);	}	stateMachine = HMC_WAIT_FOR_DIRECT_PING;	if (remoteLatencies.size() == remoteNodes.size()) {		//	It's done, tell the query it is		vector<NodeIdentLat> dummy;		getMerid()->getQueryTable()->notifyQLatency(getQueryID(), dummy);	}		return 0;	}int HandleMCGeneric::handleEvent(		const NodeIdent& in_remote, 		const char* inPacket, int packetSize) {	// Forward certain types of packets backwards,	// such as RET_RESPONSE, in which case set finished = true					if (stateMachine == HMC_WAIT_FOR_FIN) {		BufferWrapper rb(inPacket, packetSize);				char queryType;	uint64_t queryID;		if (Packet::parseHeader(rb, &queryType, &queryID) == -1) {							assert(false); // This should not be possible		}				if (queryType == getQueryType()) {						//	Query may have gone in a loop, let just say we are the closest			WARN_LOG("WARNING: Query may have gone in a loop\n");			RetResponse retPacket(qid, 0, 0, remoteLatencies);			//	NOTE: Don't need to use rendavous node as there should be a 			//	hole in the NAT to in_remote as we just received the packet			RealPacket* inPacket = new RealPacket(in_remote);			if (retPacket.createRealPacket(*inPacket) == -1) {				delete inPacket;						} else {				meridProcess->addOutPacket(inPacket);			}			} else if ((in_remote.addr == selectedMember.addr) &&				(in_remote.port == selectedMember.port)) {			WARN_LOG("Received packet from selected ring member\n");						if (queryType == RET_RESPONSE) {				RetResponse* retResp = 					RetResponse::parse(in_remote, inPacket, packetSize);				if (retResp == NULL) {					ERROR_LOG("Malformed packet received\n");					return -1;				}					RealPacket* inPacket = new RealPacket(srcNode);				if (retResp->createRealPacket(*inPacket) == -1) {					delete inPacket;							} else {					meridProcess->addOutPacket(inPacket);				}				delete retResp;	// Done with RetResponse				finished = true;					} else if (queryType == RET_ERROR) {				RetError* retErr = RetError::parse(inPacket, packetSize);				if (retErr == NULL) {					ERROR_LOG("Malformed packet received\n");					return -1;				}				RealPacket* inPacket = new RealPacket(srcNode);				if (retErr->createRealPacket(*inPacket) == -1) {					delete inPacket;							} else {					meridProcess->addOutPacket(inPacket);				}				delete retErr;	// Done with RetResponse				finished = true;											} else if (queryType == RET_INFO) {				RetInfo* curRetInfo 					= RetInfo::parse(in_remote, inPacket, packetSize);				if (curRetInfo == NULL) {					ERROR_LOG("Malformed packet received\n");					return -1;									}				RealPacket* inPacket = new RealPacket(srcNode);				if (curRetInfo->createRealPacket(*inPacket) == -1) {					delete inPacket;							} else {					meridProcess->addOutPacket(inPacket);				}				delete curRetInfo;					}		}			}	return 0;			}int HandleMCGeneric::handleForward() {	//	We have all the information necessary to make a forwarding decision	u_int lowestLatUS = UINT_MAX;	NodeIdent closestMember = {0, 0};	//	For the lowest latency node by iterating through the ring members	//	that have returned results back	map<NodeIdent, map<NodeIdent, u_int, ltNodeIdent>*, ltNodeIdent>::		iterator it = ringLatencies.begin();		for (; it != ringLatencies.end(); it++) {		u_int curAvgLatency;		if (getAvgSolution(*(it->second), &curAvgLatency) == -1) {			ERROR_LOG("Incorrect average solution calculation\n");			continue;					}		if (curAvgLatency < lowestLatUS) {			lowestLatUS = curAvgLatency;			closestMember = it->first;		}	}		double betaRatio = ((double)betaNumer) / ((double)betaDenom);	if (betaRatio <= 0.0 || betaRatio >= 1.0) {		ERROR_LOG("Illegal beta parameter\n"); 		betaRatio = 0.5;	}	//	Calculate the forwarding threshold	u_int latencyThreshold = 0;		long long tmpLatThreshold_ll = llround(betaRatio * (double)averageLatUS);	//	Just to be extra careful with rounding	if (tmpLatThreshold_ll > UINT_MAX) {		latencyThreshold = UINT_MAX;	} else if (tmpLatThreshold_ll < 0) {		latencyThreshold = 0;		} else {		latencyThreshold = (u_int)tmpLatThreshold_ll;	}	WARN_LOG_1("Latency threshold is %d\n", latencyThreshold);	WARN_LOG_1("Lowest latency is %d\n", lowestLatUS);	WARN_LOG_1("My latency is %d\n", averageLatUS);	if (lowestLatUS == 0 || lowestLatUS > latencyThreshold) {		//	Did not meet the threshold, return closest so far		RetResponse* retResp = NULL;		//	Pick the closest we know right now and return		if (lowestLatUS < averageLatUS) {			// Reusing iterator it			it = ringLatencies.find(closestMember);			assert(it != ringLatencies.end());			retResp = new RetResponse(qid, closestMember.addr, 				closestMember.port, *(it->second));		} else {			//	Itself is the closest			retResp = new RetResponse(qid, 0, 0, remoteLatencies);		}		RealPacket* inPacket = new RealPacket(srcNode);		if (retResp->createRealPacket(*inPacket) == -1) {			delete inPacket;					} else {			meridProcess->addOutPacket(inPacket);		}		delete retResp;	// Done with RetResponse		finished = true;					} else {#ifdef DEBUG				u_int netAddr = htonl(closestMember.addr);		char* remoteString = inet_ntoa(*(struct in_addr*)&(netAddr));					WARN_LOG_2("Forwarding to closest member %s:%d\n", 			remoteString, closestMember.port);#endif						NodeIdent tmpRendvNode = meridProcess->returnRendv();				ReqConstraintGeneric* reqMC = createReqConstraint(qid, betaNumer, 			betaDenom, tmpRendvNode.addr, tmpRendvNode.port);		set<NodeIdentConst, ltNodeIdentConst>::iterator it 			= remoteNodes.begin();		for (; it != remoteNodes.end(); it++) {			reqMC->addTarget(*it);			}				NodeIdentRendv tmpRendvOut 			= {closestMember.addr, closestMember.port, 0, 0};		set<NodeIdentRendv, ltNodeIdentRendv>::iterator setRendvIt 			= ringMembers.find(tmpRendvOut);		if (setRendvIt == ringMembers.end()) {			ERROR_LOG("Data in HandleClosestGeneric inconsistent\n");		} else {			// This gets the rendavous data			tmpRendvOut = *setRendvIt;		}						RealPacket* inPacket = new RealPacket(tmpRendvOut);							//RealPacket* inPacket = new RealPacket(closestMember);		if (reqMC->createRealPacket(*inPacket) == -1) {			delete inPacket;			finished = true;				} else {			selectedMember = closestMember;			meridProcess->addOutPacket(inPacket);			computeTimeout(MAX_RTT_MS * MICRO_IN_MILLI, &timeoutTV);			stateMachine = HMC_WAIT_FOR_FIN;		}		delete reqMC;	}	return 0;}int HandleMCGeneric::sendReqProbes() {	if (stateMachine != HMC_WAIT_FOR_DIRECT_PING || 			remoteLatencies.size() != remoteNodes.size()) {		return -1;	// More results needed	}			//	Find the longest and average time in the remoteLatenties map	u_int largestAddUS;	if (getMaxSolution(remoteLatencies, &largestAddUS, false) == -1) {		ERROR_LOG("Incorrect MAX calculation\n");		finished = true;		return -1;					}		u_int largestSubUS;	if (getMaxSolution(remoteLatencies, &largestSubUS, true) == -1) {		ERROR_LOG("Incorrect MAX calculation\n");		finished = true;		return -1;					}				if (getAvgSolution(remoteLatencies, &averageLatUS) == -1) {		ERROR_LOG("Incorrect AVG calculation\n");		finished = true;		return -1;	}				WARN_LOG_1("Largest latency is %d\n", largestAddUS);	double betaRatio = ((double)betaNumer) / ((double)betaDenom);	WARN_LOG_1("Beta ratio is %0.2f\n", betaRatio);	if (betaRatio <= 0.0 || betaRatio >= 1.0) {		ERROR_LOG("Illegal beta parameter\n"); 		betaRatio = 0.5;	}	//	Update timeout	u_int newTimeoutPeriod = (u_int) ceil(		((2.0 * betaRatio) + 1.0) * (double)largestAddUS);			computeTimeout(newTimeoutPeriod, &timeoutTV);	//	Change to next state				stateMachine = HMC_INDIRECT_PING;	// Have to worry about 0 latencies for multiconstraint				if ((averageLatUS == 0) || 		(meridProcess->getRings()->fillVector(largestSubUS, 			largestAddUS, betaRatio, ringMembers) == -1) || 		(ringMembers.size() == 0)) {						// 0, 0 means itself								RetResponse retPacket(qid, 0, 0, remoteLatencies);			RealPacket* inPacket = new RealPacket(srcNode);		if (retPacket.createRealPacket(*inPacket) == -1) {			delete inPacket;					} else {			meridProcess->addOutPacket(inPacket);		}		finished = true;		return 0;	}							//	Send a ReqTCPProbeAverage to each of the ring members	set<NodeIdentRendv, ltNodeIdentRendv>::iterator setIt		= ringMembers.begin();	for (; setIt != ringMembers.end(); setIt++) {/*				NodeIdent rendvIdent;		NodeIdentRendv outIdent = {setIt->addr, setIt->port, 0, 0};		if (meridProcess->getRings()->rendvLookup(*setIt, rendvIdent) != -1) {			outIdent.portRendv = rendvIdent.port;			outIdent.addrRendv = rendvIdent.addr;		}				ReqProbeGeneric* newQuery =			createReqProbe(outIdent, remoteNodes, meridProcess);*/		ReqProbeGeneric* newQuery =			createReqProbe(*setIt, remoteNodes, meridProcess);		if (meridProcess->getQueryTable()->insertNewQuery(				newQuery) == -1) {						delete newQuery;		} else {			newQuery->subscribeLatency(qid);			newQuery->init();		}	}	return 0;}int HandleMCGeneric::handleLatency(		const vector<NodeIdentLat>& in_remoteNodes) {	//	Determine action depending on state	switch (stateMachine) {	case HMC_INIT: {			return 0;	// Unexpected, just return 0		} break;	case HMC_WAIT_FOR_DIRECT_PING: {			if (remoteLatencies.size() == remoteNodes.size()) {								return sendReqProbes();				}			//	Must have at least one entry in vector unless all entries			//	are already accounted for								if (in_remoteNodes.size() != 1) {				return -1;					}			//	NOTE: NodeIdentConst is keyed only on the addr and port			//	so this will still match			NodeIdentConst in_remote_const				= {in_remoteNodes[0].addr, in_remoteNodes[0].port, 0};			NodeIdent in_remote				= {in_remote_const.addr, in_remote_const.port};							u_int latency_us = in_remoteNodes[0].latencyUS;					if (remoteNodes.find(in_r

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -