⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 meridianprocess.cpp

📁 件主要用于帮助计算机爱好者学习蚁群算法时做有关蚁群算法的试验。蚁群算法作为一种优秀的新兴的算法
💻 CPP
📖 第 1 页 / 共 4 页
字号:
	pos += snprintf(buf + pos, packetSize - pos, "\n</body>\n</HTML>");			inPacket.setPayLoadSize(pos);	// Reset payload size	if (!(inPacket.completeOkay())) {		assert(false);		return -1;	// This should never happen	}	return 0;}int MeridianProcess::handleInfoConnections(		fd_set* curReadSet, fd_set* curWriteSet) {				if (g_infoSock == -1) {		return 0;	// Info service not started, just exit function	}	u_int numConnections = 0;	// Keep a counter of the number of live								// connections in order to put a upper bound									// Handle existing requests	vector<list<pair<int, RealPacket*>*>::iterator> deleteVector;		list<pair<int, RealPacket*>*>::iterator conIt = g_infoConnects.begin();	for (; conIt != g_infoConnects.end(); conIt++) {		numConnections++;		if (FD_ISSET((*conIt)->first, curReadSet)) {			int recvRet = 				recv((*conIt)->first, g_webDrainBuf, DRAIN_BUFFER_SIZE, 0);							if (recvRet == -1 || recvRet == 0) {								deleteVector.push_back(conIt);	// Error reading							} else {				//	HACK: If the received first character is M, then return				//	a binary packet with info instead of a formatted string									if (g_webDrainBuf[0] == 'M') {					// Fill using info packet					InfoPacket tmpInfo(0, getRings());					if (tmpInfo.createRealPacket(*((*conIt)->second)) == -1) {						deleteVector.push_back(conIt);						continue;					}				} else {					//	Fill a formatted output					if (getInfoPacket(*((*conIt)->second)) == -1) {						deleteVector.push_back(conIt);						continue;					}					}				FD_CLR((*conIt)->first, &g_readSet);				FD_SET((*conIt)->first, &g_writeSet);			}		} else if (FD_ISSET((*conIt)->first, curWriteSet)) {			//	Write out the info to the browser			RealPacket* curPacket = (*conIt)->second; 			int sendRet = send((*conIt)->first, 				curPacket->getPayLoad() + curPacket->getPos(), 				curPacket->getPayLoadSize() - curPacket->getPos(), 0);			if (sendRet == -1 || 				sendRet == curPacket->getPayLoadSize() - curPacket->getPos()) {					deleteVector.push_back(conIt);			} else {				curPacket->incrPos(sendRet);			}		}	}	//	Check for new requests	if (FD_ISSET(g_infoSock, curReadSet)) {			struct sockaddr_in tmpAddr;		int sinSize = sizeof(struct sockaddr_in);		int infoSock = accept(g_infoSock,			(struct sockaddr*)&tmpAddr, (socklen_t*)&sinSize);		if (infoSock != -1) {						NodeIdent remoteNode = {ntohl(tmpAddr.sin_addr.s_addr), 									ntohs(tmpAddr.sin_port) };												RealPacket* inPacket = 				new RealPacket(remoteNode, MAX_INFO_PACKET_SIZE);			if (inPacket != NULL) {				g_infoConnects.push_back(					new pair<int, RealPacket*>(infoSock, inPacket));				numConnections++;									//	Only allow MAX_INFO_CONNECTIONS simultaneous connections				if (numConnections > 						(MAX_INFO_CONNECTIONS + deleteVector.size())) {					deleteVector.push_back(g_infoConnects.begin());								}				FD_SET(infoSock, &g_readSet);				g_maxFD = MAX(g_maxFD, infoSock);			} else {				ERROR_LOG("Cannot create RealPacket\n");				close(infoSock);				}		}	}		//	Cleanup all finished connections	for (u_int i = 0; i < deleteVector.size(); i++) {		pair<int, RealPacket*>* curPair = *(deleteVector[i]);		g_infoConnects.erase(deleteVector[i]);		FD_CLR(curPair->first, &g_readSet);						FD_CLR(curPair->first, &g_writeSet);		close(curPair->first);				delete curPair->second;		delete curPair;	}	return 0;}int MeridianProcess::increaseSockBuf(int sock) {	for (int i = 16; i >= 0; i--) {		int sockBufMax = 1 << i;	// 32K		int retRCV = setsockopt(sock, SOL_SOCKET, 			SO_RCVBUF, &sockBufMax, sizeof(sockBufMax));		int retSND = setsockopt(sock, SOL_SOCKET, 			SO_SNDBUF, &sockBufMax, sizeof(sockBufMax));		if (retRCV == 0 && retSND == 0) {			WARN_LOG_1("Socket buffer size is %d\n", sockBufMax);			break;			}	}	return 0;}	int MeridianProcess::start() {	signal(SIGPIPE, SIG_IGN);	// Ignore sigpipe			srand(time(NULL));	// Set random seed	gethostname(g_hostname, HOST_NAME_MAX);	//	Get the host name of this node	g_hostname[HOST_NAME_MAX - 1] = '\0';	struct hostent* he = gethostbyname(g_hostname);	if (he == NULL) {		perror("Cannot resolve localhost\n");		return -1;	}	g_localAddr = ntohl(((struct in_addr *)(he->h_addr))->s_addr);	//	Main meridian UDP socket	if ((g_meridSock = socket(AF_INET, SOCK_DGRAM, 0)) == -1) {		perror("Cannot create UDP socket");					return -1;	}	increaseSockBuf(g_meridSock);		//	Set up to listen to meridian port	struct sockaddr_in myAddr;	myAddr.sin_family 		= AF_INET;	myAddr.sin_port 		= htons(g_meridPort);	myAddr.sin_addr.s_addr 	= INADDR_ANY;	memset(&(myAddr.sin_zero), '\0', 8);	if (bind(g_meridSock, (struct sockaddr*)&myAddr, 			sizeof(struct sockaddr)) == -1) {		ERROR_LOG("Cannot bind UDP socket to desired port\n");		return -1;	}	//	Make the socket non-blocking	if (setNonBlock(g_meridSock) == -1) {		ERROR_LOG("Cannot set socket to be non-blocking\n");				return -1;	}#ifdef MERIDIAN_DSL		// Used only to allow scheduling of processes	if ((g_dummySock = socket(AF_INET, SOCK_DGRAM, 0)) == -1) {		perror("Cannot create UDP socket");					return -1;	}	//FD_SET(g_dummySock, &g_writeSet);	//g_maxFD = MAX(g_dummySock, g_maxFD); #endif		//	Adding socket to read set 	FD_SET(g_meridSock, &g_readSet);	g_maxFD = MAX(g_meridSock, g_maxFD);	//	Adding stop fd to read set	FD_SET(g_stopFD, &g_readSet);	g_maxFD = MAX(g_stopFD, g_maxFD);	//	An info port of 0 means that no info service should be started	if (g_infoPort > 0) { 		//	Create listener for info requests		if ((g_infoSock = createTCPListener(g_infoPort)) == -1) {			ERROR_LOG("Cannot create TCP listener socket (info port)");			return -1;					}		//	Adding socket to read set 		FD_SET(g_infoSock, &g_readSet);		g_maxFD = MAX(g_infoSock, g_maxFD);			}	//	If this node is not behind a firewall, it can potentially be a	//	rendavous point for another node	if (g_rendvNode.addr == 0 && g_rendvNode.port == 0) {		if ((g_rendvListener = createTCPListener(g_meridPort)) == -1) {			perror("Cannot create TCP listener socket (rendavous port)");							} else {			FD_SET(g_rendvListener, &g_readSet);			g_maxFD = MAX(g_rendvListener, g_maxFD);		}	} else {		g_rendvFD = createRendavousTunnel(g_rendvNode);		if (g_rendvFD != -1) {			FD_SET(g_rendvFD, &g_readSet);			g_maxFD = MAX(g_rendvFD, g_maxFD);		} else {			ERROR_LOG("FATAL: Cannot create rendavous tunnel\n");			// TODO: Might be a better way to handle this			return -1;		}	}	#ifdef PLANET_LAB_SUPPORT	if (createICMPSocket() == -1) {		ERROR_LOG("Cannot create ICMP socket\n");		return -1;	}	// TODO: call setuid to not be root anymore		//	Adding socket to read set 	FD_SET(g_icmpSock, &g_readSet);	g_maxFD = MAX(g_icmpSock, g_maxFD);	#endif		// Add all seed nodes as ring members (performs probing)	for (u_int i = 0; i < g_seedNodes.size(); i++) {		NodeIdentRendv tmpNIR = g_seedNodes[i];		if ((tmpNIR.addr != g_localAddr) || (tmpNIR.port != g_meridPort)) {			addNodeToRing(tmpNIR);			} else {			WARN_LOG("Cannot add itself as a seed node\n");			}	}	g_seedNodes.clear();	// Don't need them anymore		//	Perform gossip at next gossipInterval				SchedGossip gossipCallBack(this);	QueryScheduler* gossipScheduler = new QueryScheduler(		g_initGossipInterval_s * 1000, 		g_numInitIntervalRemain, g_ssGossipInterval_s * 1000, this, 		&gossipCallBack);	if (g_queryTable.insertNewQuery(gossipScheduler) == -1) {				ERROR_LOG("Cannot add gossip scheduler\n");		delete gossipScheduler;	} else {		gossipScheduler->init();		}	//	Perform ring replacement at next ring replacement interval				SchedRingManage ringCallBack(this);	QueryScheduler* ringScheduler = new QueryScheduler(0, 0, 		g_replaceInterval_s * 1000, this, &ringCallBack);	if (g_queryTable.insertNewQuery(ringScheduler) == -1) {				ERROR_LOG("Cannot add ring management scheduler\n");		delete ringScheduler;	} else {		ringScheduler->init();		}	//	Declaring structures that will be reused over and over	fd_set currentReadSet, currentWriteSet;	struct timeval curTime;	struct timeval nextEventTime;	struct timeval timeOutTV;	//	Main event driven select loop	while (true) {			//	Set timeout					gettimeofday(&curTime, NULL);					g_queryTable.nextTimeout(&nextEventTime);				//	Set time out length		if (timeoutLength(&curTime, &nextEventTime, &timeOutTV) == -1) {						evaluateTimeout();	//	Already expired			continue;	// Loop again		}		//	Reset fd_set values		memcpy(&currentReadSet, &g_readSet, sizeof(fd_set));		memcpy(&currentWriteSet, &g_writeSet, sizeof(fd_set));				int selectRet = select(g_maxFD+1, 			&currentReadSet, &currentWriteSet, NULL, &timeOutTV);					if (selectRet == -1) {			if (errno == EINTR) {									continue; // Interrupted by signal, retry			}			ERROR_LOG("Select returned an error\n");			return -1;	// Return with error		} else if (selectRet == 0) {					evaluateTimeout();				continue;		}		if (FD_ISSET(g_stopFD, &currentReadSet)) {			ERROR_LOG("Received stop request\n");			//	Don't even bother reading it			break;					}		if (FD_ISSET(g_meridSock, &currentReadSet)) {			readPacket();			}		if (FD_ISSET(g_meridSock, &currentWriteSet)) {			writePending();		}#ifdef PLANET_LAB_SUPPORT		if (FD_ISSET(g_icmpSock, &currentReadSet)) {			WARN_LOG("ICMP Read pending!!!!\n");			readICMPPacket();			}		if (FD_ISSET(g_icmpSock, &currentWriteSet)) {			icmpWritePending();							}#endif		#ifdef MERIDIAN_DSL				if (FD_ISSET(g_dummySock, &currentWriteSet)) {			//vector<list<uint64_t>::iterator> delete_vect;			vector<list<uint64_t>::iterator> clear_vect;			list<uint64_t>::iterator psIt = g_psList.begin();			vector<NodeIdentLat> dummyVect; #define MAX_THREADS_PER_ITERATION 	5			for (int itCount = 0; psIt != g_psList.end() && 					itCount < MAX_THREADS_PER_ITERATION; psIt++, itCount++) {				uint64_t curQueryID = *psIt;				clear_vect.push_back(psIt);				if (getQueryTable()->isQueryInTable(curQueryID)) {					const DSLRecvQuery* thisQ 						= getQueryTable()->getDSLRecvQ(curQueryID);					// If thisQ is NULL (error due to cast?), remove from list					if (thisQ == NULL) {						fprintf(stderr,								"g_psList contains a non-DSLRecvQuery query\n");						continue;					}					// If thread is blocked, remove from g_psList					if (thisQ->parserState() == PS_BLOCKED) {						//printf("Thread is blocked, skip\n");						continue;						}					getQueryTable()->notifyQLatency(curQueryID, dummyVect);					// If thread no longer active, remove from g_psList					if (getQueryTable()->isQueryInTable(curQueryID)) {						g_psList.push_back(curQueryID);																	}																			} 														}			//	Just remove it from the list, as we moved 			//	it to another position			for (u_int i = 0; i < clear_vect.size(); i++) {				g_psList.erase(clear_vect[i]);			}						//	Turn off trigger if no process need to be executed			if (g_psList.empty()) {				FD_CLR(g_dummySock, &g_writeSet);				}		}#endif				if (handleRendavous(&currentReadSet, &currentWriteSet) == -1) { 			break;	// Connection to rendavous node broken		}		handleInfoConnections(&currentReadSet, &currentWriteSet);		handleTCPConnections(&currentReadSet, &currentWriteSet);		handleDNSConnections(&currentReadSet, &currentWriteSet);	}	return 0;}int MeridianProcess::handleTCPConnections(		fd_set* curReadSet, fd_set* curWriteSet) {				// Handle existing requests	vector<map<int, pair<uint64_t, NodeIdent>*>::iterator> deleteVector;		map<int, pair<uint64_t, NodeIdent>*>::iterator conIt 		= g_tcpProbeConnections.begin();	for (; conIt != g_tcpProbeConnections.end(); conIt++) {		if (FD_ISSET(conIt->first, curWriteSet)) {						struct sockaddr	peerAddr;			socklen_t peerLen = sizeof(struct sockaddr);						if (getpeername(conIt->first, &peerAddr, &peerLen) != -1){							pair<uint64_t, NodeIdent>* thisPair = conIt->second;				//	Pass back latency of 0, as the timing is done 				//	within the query, not in the TCP connection				NodeIdentLat outNIL = 					{(thisPair->second).addr, (thisPair->second).port, 0};				vector<NodeIdentLat> subVect;				subVect.push_back(outNIL);								g_queryTable.notifyQLatency(thisPair->first, subVect);			}			// TODO: Add notifyError for quicker notification of error			deleteVector.push_back(conIt);	// We can delete this now		}	}		//	Cleanup all finished connections	for (u_int i = 0; i < deleteVector.size(); i++) {		eraseTCPConnection(deleteVector[i]);	}	return 0;}int MeridianProcess::handleDNSConnections(		fd_set* curReadSet, fd_set* curWriteSet) {				// Handle existing requests	vector<map<int, pair<uint64_t, NodeIdent>*>::iterator> deleteVector;		map<int, pair<uint64_t, NodeIdent>*>::iterator conIt 		= g_dnsProbeConnections.begin();	for (; conIt != g_dnsProbeConnections.end(); conIt++) {				if (FD_ISSET(conIt->first, curReadSet)) {			WARN_LOG("Response from DNS server\n");			pair<uint64_t, NodeIdent>* thisPair = conIt->second;			//	Pass back latency of 0, as the timing is done 			//	within the query, not in the DNS connection			NodeIdentLat outNIL = 				{(thisPair->second).addr, (thisPair->second).port, 0};			vector<NodeIdentLat> subVect;			subVect.push_back(outNIL);							g_queryTable.notifyQLatency(thisPair->first, subVect);			deleteVector.push_back(conIt);	// We can delete this now		}	}		//	Cleanup all finished connections	for (u_int i = 0; i < deleteVector.size(); i++) {		eraseDNSConnection(deleteVector[i]);	}	return 0;}int MeridianProcess::removeRendavousConnection(		map<NodeIdent, int, ltNodeIdent>::iterator& in_it) {	int oldSock = in_it->second;	g_rendvConnections.erase(in_it);	map<int, list<RealPacket*>*>::iterator queueIt =		g_rendvQueue.find(oldSock);	assert(queueIt != g_rendvQueue.end());	list<RealPacket*>* packetList = queueIt->second;	//	Remove from rendvQueue	g_rendvQueue.erase(queueIt);	//	Delete all RealPackets in queue			list<RealPacket*>::iterator listIt = packetList->begin();	for (; listIt != packetList->end(); listIt++) {		delete *listIt;	}			delete packetList;	//	Delete the queue itself	//	Make sure we FD_CLR and close the socket	FD_CLR(oldSock, &g_writeSet);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -