⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 meridianprocess.cpp

📁 件主要用于帮助计算机爱好者学习蚁群算法时做有关蚁群算法的试验。蚁群算法作为一种优秀的新兴的算法
💻 CPP
📖 第 1 页 / 共 4 页
字号:
		}		newQuery->init();			}	return 0;}int MeridianProcess::performRingManagement() {	//	Find all full rings	int numRings = g_rings->getNumberOfRings();	vector<int> eligibleRings;	for (int i = 0; i < numRings; i++) {		//	Test if the ring is eligible for ring management		if (g_rings->eligibleForReplacement(i)) {			eligibleRings.push_back(i);		}	}	if (eligibleRings.empty()) {		return 0;		}	// 	Pick a random eligible ring	int selectedRing = eligibleRings[rand() % eligibleRings.size()];	RingManageQuery* newQuery = new RingManageQuery(selectedRing, this);	if (g_queryTable.insertNewQuery(newQuery) == -1) {					delete newQuery;		return -1;	}	newQuery->init();		return 0;	}int MeridianProcess::addOutPacket(RealPacket* in_packet) {	g_outPacketList.push_back(in_packet);	FD_SET(g_meridSock, &g_writeSet);	g_maxFD = MAX(g_meridSock, g_maxFD); 	return 0;}void MeridianProcess::writePending() {	while (true) {		assert(!(g_outPacketList.empty()));		RealPacket* firstPacket = g_outPacketList.front();		if (performSend(g_meridSock, firstPacket) == -1) {			if (errno == EAGAIN || errno == EWOULDBLOCK) {													break; // Retry again later when ready to send			} else {				//	Let's just continute still, but remove this packet				ERROR_LOG("Error calling send\n");				}		}				g_outPacketList.pop_front();		delete firstPacket;	// Done with packet		if (g_outPacketList.empty()) {			FD_CLR(g_meridSock, &g_writeSet);			break;	// No more to send		}	}}int MeridianProcess::performSend(int sock, RealPacket* in_packet) {#ifdef DEBUG		u_int netAddr = htonl(in_packet->getAddr());	char* ringNodeStr = inet_ntoa(*(struct in_addr*)&(netAddr));		WARN_LOG_3("Sending query to port number %s:%d of size %d\n", 		ringNodeStr, in_packet->getPort(), in_packet->getPayLoadSize());#endif			// Handle firewall host by wrapping it around a PUSH packet	if (in_packet->getRendvAddr() != 0 || in_packet->getRendvPort() != 0) {#ifdef DEBUG				u_int netAddr = htonl(in_packet->getRendvAddr());				char* ringNodeStr = inet_ntoa(*(struct in_addr*)&(netAddr));				WARN_LOG_2("Redirecting to rendavous node, %s:%d\n", ringNodeStr, 			in_packet->getRendvPort());#endif					//	QID for push packet should never be used, just set it to 0		PushPacket pushPacket(0, in_packet->getAddr(), in_packet->getPort());		NodeIdent rendvNode 			= {in_packet->getRendvAddr(), in_packet->getRendvPort()};		//	This packet MUST not have a rendavous host		RealPacket tmpPacket(rendvNode);		if (pushPacket.createRealPacket(tmpPacket) == -1) {			ERROR_LOG("Cannot create PUSH packet\n");			return -1;		}		tmpPacket.append_packet(*in_packet);		if (!(tmpPacket.completeOkay())) {						ERROR_LOG("Cannot create PUSH packet\n");			return -1;		}		return performSend(sock, &tmpPacket);	}					struct sockaddr_in hostAddr;	//memset(&(hostAddr), '\0', sizeof(struct sockaddr_in));	hostAddr.sin_family         = AF_INET;	hostAddr.sin_port           = htons(in_packet->getPort());	hostAddr.sin_addr.s_addr    = htonl(in_packet->getAddr());	memset(&(hostAddr.sin_zero), '\0', 8);	int sendRet = sendto(sock, in_packet->getPayLoad(),		in_packet->getPayLoadSize(), 0,		(struct sockaddr*)&hostAddr, sizeof(struct sockaddr));	return sendRet;}int MeridianProcess::readPacket() {	char buf[MAX_UDP_PACKET_SIZE];	struct sockaddr_in theirAddr;	int addrLen = sizeof(struct sockaddr);	//	Perform actual recv on socket	int numBytes = recvfrom(g_meridSock, buf, MAX_UDP_PACKET_SIZE, 0,		(struct sockaddr*)&theirAddr, (socklen_t*)&addrLen);			if (numBytes == -1) {		perror("Error on recvfrom");		return -1;			}	NodeIdent remoteNode = {ntohl(theirAddr.sin_addr.s_addr), 							ntohs(theirAddr.sin_port) };								return handleNewPacket(buf, numBytes, remoteNode);}int MeridianProcess::handleNewPacket(		char* buf, int numBytes, const NodeIdent& remoteNode) {	BufferWrapper rb(buf, numBytes);			char queryType;	uint64_t queryID;	if (Packet::parseHeader(rb, &queryType, &queryID) != -1) {		switch (queryType) {			case PUSH: {					RealPacket* inPacket 						= PushPacket::parse(remoteNode, buf, numBytes);											NodeIdent destIdent 						= {inPacket->getAddr(), inPacket->getPort()};#ifdef DEBUG											u_int netAddr = htonl(destIdent.addr);					char* ringNodeStr = inet_ntoa(*(struct in_addr*)&(netAddr));					WARN_LOG_2("PUSH dest is %s:%d\n", 						ringNodeStr, destIdent.port);#endif															map<NodeIdent, int, ltNodeIdent>::iterator rendvIt 						= g_rendvConnections.find(destIdent);					if (rendvIt == g_rendvConnections.end()) {						ERROR_LOG("Node not rendavous for target\n");						delete inPacket;																	} else {						map<int, list<RealPacket*>*>::iterator rendvQIt 							= g_rendvQueue.find(rendvIt->second);						if (rendvQIt == g_rendvQueue.end()) {							ERROR_LOG("Inconsistent rendavous state\n");							delete inPacket;							//	Let's try to fix it anyway 							g_rendvConnections.erase(rendvIt);						} else {							//	Push it into queue and then on fd in writeSet							rendvQIt->second->push_back(inPacket);							FD_SET(rendvQIt->first, &g_writeSet);							g_maxFD = MAX(rendvQIt->first, g_maxFD);						}					}									} break;			case PING: {					PongPacket pongPacket(queryID);					RealPacket* inPacket = new RealPacket(remoteNode);					if (pongPacket.createRealPacket(*inPacket) == -1) {						delete inPacket;											} else {						addOutPacket(inPacket);					}				} break;			case GOSSIP: 			case GOSSIP_PULL: {#ifdef DEBUG									if (queryType == GOSSIP) {						WARN_LOG("Received a GOSSIP packet\n");					} else {						WARN_LOG("Received a GOSSIP_PULL packet\n");					}#endif					GossipPacketGeneric* tmp = NULL;					if (queryType == GOSSIP) {											tmp = GossipPacketGeneric::							parse<GossipPacketPush>(buf, numBytes);					} else {						tmp = GossipPacketGeneric::							parse<GossipPacketPull>(buf, numBytes);					}					if (tmp != NULL) {						//	Add remote node to ring						NodeIdentRendv remoteNodeRendv = { 							remoteNode.addr, remoteNode.port, 							tmp->getRendvAddr(), tmp->getRendvPort() };#ifdef DEBUG													u_int netAddr = htonl(tmp->getRendvAddr());						char* ringNodeStr = 							inet_ntoa(*(struct in_addr*)&(netAddr));						WARN_LOG_2("Rendv in GOSSIP packet is %s:%d\n", 							ringNodeStr, tmp->getRendvPort());#endif													addNodeToRing(remoteNodeRendv);						// 	Add nodes in gossip packet to ring												const vector<NodeIdentRendv>* tmpVect 							= tmp->returnTargets();						for (u_int i = 0; i < tmpVect->size(); i++) {							//NodeIdentRendv tmpRendv = (*tmpVect)[i];							addNodeToRing((*tmpVect)[i]);						}#ifdef GOSSIP_PUSHPULL						if (queryType == GOSSIP) {							GossipPacketPull gPacket(getNewQueryID(), 									g_rendvNode.addr, g_rendvNode.port);							if (GossipQuery::fillGossipPacket(gPacket, 									remoteNodeRendv, this) == 0) {								WARN_LOG("Creating GOSSIP_PULL ###########\n");								RealPacket* inPacket = 									new RealPacket(remoteNodeRendv);								if (gPacket.createRealPacket(*inPacket) == -1) {									delete inPacket;									} else {									addOutPacket(inPacket);								}							}						}#endif												delete tmp;	// Finished with gossip packet					}									} break;#ifdef PLANET_LAB_SUPPORT							case REQ_CONSTRAINT_N_ICMP: {						handleMCReq<ReqConstraintICMP, HandleMCICMP>(						queryID, remoteNode, buf, numBytes);				} break;#endif							case REQ_CONSTRAINT_N_PING: {						handleMCReq<ReqConstraintPing, HandleMCPing>(						queryID, remoteNode, buf, numBytes);				} break;							case REQ_CONSTRAINT_N_DNS: {						handleMCReq<ReqConstraintDNS, HandleMCDNS>(						queryID, remoteNode, buf, numBytes);				} break;			case REQ_CONSTRAINT_N_TCP: {						handleMCReq<ReqConstraintTCP, HandleMCTCP>(						queryID, remoteNode, buf, numBytes);							} break;#ifdef PLANET_LAB_SUPPORT							case REQ_CLOSEST_N_ICMP: {						handleClosestReq<ReqClosestICMP, HandleClosestICMP>(						queryID, remoteNode, buf, numBytes);				} break;#endif							case REQ_CLOSEST_N_MERID_PING: {						handleClosestReq<ReqClosestMeridPing, HandleClosestPing>(						queryID, remoteNode, buf, numBytes);				} break;							case REQ_CLOSEST_N_DNS: {						handleClosestReq<ReqClosestDNS, HandleClosestDNS>(						queryID, remoteNode, buf, numBytes);				} break;			case REQ_CLOSEST_N_TCP: {						handleClosestReq<ReqClosestTCP, HandleClosestTCP>(						queryID, remoteNode, buf, numBytes);							} break;			case RET_RESPONSE: {					WARN_LOG("Received a RET_RESPONSE packet\n");					g_queryTable.notifyQPacket(						queryID, remoteNode, buf, numBytes);				} break;							case RET_INFO: {					WARN_LOG("Received a RET_INFO packet\n");					g_queryTable.notifyQPacket(						queryID, remoteNode, buf, numBytes);				} break;							case RET_ERROR: {					WARN_LOG("Received a RET_ERROR packet\n");					g_queryTable.notifyQPacket(						queryID, remoteNode, buf, numBytes);				} break;							case PONG: {					WARN_LOG("Received PONG packet\n");										g_queryTable.notifyQPacket(						queryID, remoteNode, buf, numBytes);				} break;						case REQ_MEASURE_N_MERID_PING: {					WARN_LOG("Received ReqMeasurePing packet\n");					handleMeasureReq<ReqMeasurePing, HandleReqPing>(						remoteNode, buf, numBytes);				} break;			case REQ_MEASURE_N_TCP: {					WARN_LOG("Received ReqMeasureTCP packet\n");					handleMeasureReq<ReqMeasureTCP, HandleReqTCP>(						remoteNode, buf, numBytes);				} break;			case REQ_MEASURE_N_DNS: {					WARN_LOG("Received ReqMeasureDNS packet\n");					handleMeasureReq<ReqMeasureDNS, HandleReqDNS>(						remoteNode, buf, numBytes);				} break;#ifdef PLANET_LAB_SUPPORT			case REQ_MEASURE_N_ICMP: {					WARN_LOG("Received ReqMeasureICMP packet\n");					handleMeasureReq<ReqMeasureICMP, HandleReqICMP>(						remoteNode, buf, numBytes);				} break;#endif			case RET_PING_REQ: {					WARN_LOG("Received a RET_PING\n");					g_queryTable.notifyQPacket(						queryID, remoteNode, buf, numBytes);									} break;#ifdef MERIDIAN_DSL			case DSL_REPLY: {					g_queryTable.notifyQPacket(						queryID, remoteNode, buf, numBytes);				} break;			case DSL_REQUEST: {					ParserState* new_state = new ParserState();					if (new_state == NULL) {						break;	// Cannot create new parser state						}					DSLRequestPacket* inPacket 						= DSLRequestPacket::parse(new_state, buf, numBytes);					if (inPacket == NULL) {						delete new_state;						break;	// Cannot parse packet											}					//	Get necessary data from packet					NodeIdentRendv remoteNodeRendv = { 						remoteNode.addr, remoteNode.port, 						inPacket->getRendvAddr(), inPacket->getRendvPort() };					uint64_t prevID = inPacket->retReqID();					uint16_t q_timeout = inPacket->timeout_ms();					uint16_t q_ttl = inPacket->getTTL();					//printf("Timeout of %d ms\n", q_timeout);					delete inPacket;	// No longer needed					//	Drop packets that have too large TTLs					if (q_ttl > g_max_ttl) {						delete new_state;						break;											}										g_parser_line = 1;	// Reset line count for parser					if (yyparse((void*)new_state) == -1) {						delete new_state;						break;	// Parse error					}					if (new_state->save_context() == -1) {						delete new_state;						break;	// Error saving context					}										makecontext(new_state->get_context(), 						(void (*)())(&jmp_eval), 1, new_state);					DSLRecvQuery* newQ = new DSLRecvQuery(new_state, 						this, remoteNodeRendv, prevID, q_timeout, q_ttl);					if (newQ == NULL) {						delete new_state;						break; // Error creating new Query											}					//	Associate parser state with query id					new_state->setQuery(newQ);					//	Associate parser with this meridian process					new_state->setMeridProcess(this);					if (g_queryTable.insertNewQuery(newQ) == -1) {									delete newQ;	// State is deleted with newQ					} else {						newQ->init();						g_psList.push_back(newQ->getQueryID());						FD_SET(g_dummySock, &g_writeSet);						g_maxFD = MAX(g_dummySock, g_maxFD);					}							} break;#endif			default:				break;		}	}	return 0;}int MeridianProcess::getInfoPacket(RealPacket& inPacket) {	int pos = inPacket.getPayLoadSize();	char* buf = inPacket.getPayLoad();	int packetSize = inPacket.getPacketSize();	// This is taken from old version of code	struct timeval tvStart, tvEnd;	gettimeofday(&tvStart, NULL);		pos += snprintf(buf + pos, packetSize - pos, "HTTP/1.1 200 OK\r\n");	pos += snprintf(buf + pos, packetSize - pos, 		"Content-Type: text/html; charset=iso-8859-1\r\n\r\n");	pos += snprintf(buf + pos, packetSize - pos, "<HTML>\n"		"<title>Meridian</title>\n<body>\n");	pos += snprintf(buf + pos, packetSize - pos, 		"<H2>Meridian node: %s</H2>\n", g_hostname);	pos += snprintf(buf + pos, packetSize - pos, "<HR SIZE=\"0\">\n");	pos += snprintf(buf + pos, packetSize - pos,		"<STRONG>Ring members of %s</STRONG>\n", g_hostname);	pos += snprintf(buf + pos, packetSize - pos, 		"<TABLE cellspacing=\"3\" cellPadding=\"3\" width=\"100%s\" "		"border=\"1\">\n<TBODY>\n", "%");			int numRings = g_rings->getNumberOfRings();						for (int i = 0; i < numRings; i++) {		const vector<NodeIdent>* primRing = g_rings->returnPrimaryRing(i);		if (primRing != NULL && primRing->size() > 0) {			pos += snprintf(buf + pos, packetSize - pos,				"<TR><TD><STRONG>Nodes in ring %d</STRONG></TD>"				"<TD><STRONG>Latency</STRONG></TD></TR>\n", i);									for (u_int j = 0; j < primRing->size(); j++) {				u_int latencyUS = 0;				if (g_rings->getNodeLatency((*primRing)[j], &latencyUS) == -1) {					assert(false);									}								u_int netAddr = htonl((*primRing)[j].addr);				char* ringNodeStr = inet_ntoa(*(struct in_addr*)&(netAddr));				pos += snprintf(buf + pos, packetSize - pos,					"<TR><TD>%s</TD><TD>%0.3f ms</TD></TR>\n", 					ringNodeStr, latencyUS / 1000.0);			}		}	}		pos += snprintf(buf + pos, packetSize - pos, "</TBODY>\n</TABLE>\n");	gettimeofday(&tvEnd, NULL);	pos += snprintf(buf + pos, packetSize - pos,		"<BR>Time to create this page is %0.2f ms\n",		((tvEnd.tv_sec - tvStart.tv_sec) * 1000000 +  		(tvEnd.tv_usec - tvStart.tv_usec)) / 1000.0);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -