📄 meridianprocess.cpp
字号:
pos += snprintf(buf + pos, packetSize - pos, "\n</body>\n</HTML>"); inPacket.setPayLoadSize(pos); // Reset payload size if (!(inPacket.completeOkay())) { assert(false); return -1; // This should never happen } return 0;}int MeridianProcess::handleInfoConnections( fd_set* curReadSet, fd_set* curWriteSet) { if (g_infoSock == -1) { return 0; // Info service not started, just exit function } u_int numConnections = 0; // Keep a counter of the number of live // connections in order to put a upper bound // Handle existing requests vector<list<pair<int, RealPacket*>*>::iterator> deleteVector; list<pair<int, RealPacket*>*>::iterator conIt = g_infoConnects.begin(); for (; conIt != g_infoConnects.end(); conIt++) { numConnections++; if (FD_ISSET((*conIt)->first, curReadSet)) { int recvRet = recv((*conIt)->first, g_webDrainBuf, DRAIN_BUFFER_SIZE, 0); if (recvRet == -1 || recvRet == 0) { deleteVector.push_back(conIt); // Error reading } else { // HACK: If the received first character is M, then return // a binary packet with info instead of a formatted string if (g_webDrainBuf[0] == 'M') { // Fill using info packet InfoPacket tmpInfo(0, getRings()); if (tmpInfo.createRealPacket(*((*conIt)->second)) == -1) { deleteVector.push_back(conIt); continue; } } else { // Fill a formatted output if (getInfoPacket(*((*conIt)->second)) == -1) { deleteVector.push_back(conIt); continue; } } FD_CLR((*conIt)->first, &g_readSet); FD_SET((*conIt)->first, &g_writeSet); } } else if (FD_ISSET((*conIt)->first, curWriteSet)) { // Write out the info to the browser RealPacket* curPacket = (*conIt)->second; int sendRet = send((*conIt)->first, curPacket->getPayLoad() + curPacket->getPos(), curPacket->getPayLoadSize() - curPacket->getPos(), 0); if (sendRet == -1 || sendRet == curPacket->getPayLoadSize() - curPacket->getPos()) { deleteVector.push_back(conIt); } else { curPacket->incrPos(sendRet); } } } // Check for new requests if (FD_ISSET(g_infoSock, curReadSet)) { struct sockaddr_in tmpAddr; int sinSize = sizeof(struct sockaddr_in); int infoSock = accept(g_infoSock, (struct sockaddr*)&tmpAddr, (socklen_t*)&sinSize); if (infoSock != -1) { NodeIdent remoteNode = {ntohl(tmpAddr.sin_addr.s_addr), ntohs(tmpAddr.sin_port) }; RealPacket* inPacket = new RealPacket(remoteNode, MAX_INFO_PACKET_SIZE); if (inPacket != NULL) { g_infoConnects.push_back( new pair<int, RealPacket*>(infoSock, inPacket)); numConnections++; // Only allow MAX_INFO_CONNECTIONS simultaneous connections if (numConnections > (MAX_INFO_CONNECTIONS + deleteVector.size())) { deleteVector.push_back(g_infoConnects.begin()); } FD_SET(infoSock, &g_readSet); g_maxFD = MAX(g_maxFD, infoSock); } else { ERROR_LOG("Cannot create RealPacket\n"); close(infoSock); } } } // Cleanup all finished connections for (u_int i = 0; i < deleteVector.size(); i++) { pair<int, RealPacket*>* curPair = *(deleteVector[i]); g_infoConnects.erase(deleteVector[i]); FD_CLR(curPair->first, &g_readSet); FD_CLR(curPair->first, &g_writeSet); close(curPair->first); delete curPair->second; delete curPair; } return 0;}int MeridianProcess::increaseSockBuf(int sock) { for (int i = 16; i >= 0; i--) { int sockBufMax = 1 << i; // 32K int retRCV = setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &sockBufMax, sizeof(sockBufMax)); int retSND = setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &sockBufMax, sizeof(sockBufMax)); if (retRCV == 0 && retSND == 0) { WARN_LOG_1("Socket buffer size is %d\n", sockBufMax); break; } } return 0;} int MeridianProcess::start() { signal(SIGPIPE, SIG_IGN); // Ignore sigpipe srand(time(NULL)); // Set random seed gethostname(g_hostname, HOST_NAME_MAX); // Get the host name of this node g_hostname[HOST_NAME_MAX - 1] = '\0'; struct hostent* he = gethostbyname(g_hostname); if (he == NULL) { perror("Cannot resolve localhost\n"); return -1; } g_localAddr = ntohl(((struct in_addr *)(he->h_addr))->s_addr); // Main meridian UDP socket if ((g_meridSock = socket(AF_INET, SOCK_DGRAM, 0)) == -1) { perror("Cannot create UDP socket"); return -1; } increaseSockBuf(g_meridSock); // Set up to listen to meridian port struct sockaddr_in myAddr; myAddr.sin_family = AF_INET; myAddr.sin_port = htons(g_meridPort); myAddr.sin_addr.s_addr = INADDR_ANY; memset(&(myAddr.sin_zero), '\0', 8); if (bind(g_meridSock, (struct sockaddr*)&myAddr, sizeof(struct sockaddr)) == -1) { ERROR_LOG("Cannot bind UDP socket to desired port\n"); return -1; } // Make the socket non-blocking if (setNonBlock(g_meridSock) == -1) { ERROR_LOG("Cannot set socket to be non-blocking\n"); return -1; }#ifdef MERIDIAN_DSL // Used only to allow scheduling of processes if ((g_dummySock = socket(AF_INET, SOCK_DGRAM, 0)) == -1) { perror("Cannot create UDP socket"); return -1; } //FD_SET(g_dummySock, &g_writeSet); //g_maxFD = MAX(g_dummySock, g_maxFD); #endif // Adding socket to read set FD_SET(g_meridSock, &g_readSet); g_maxFD = MAX(g_meridSock, g_maxFD); // Adding stop fd to read set FD_SET(g_stopFD, &g_readSet); g_maxFD = MAX(g_stopFD, g_maxFD); // An info port of 0 means that no info service should be started if (g_infoPort > 0) { // Create listener for info requests if ((g_infoSock = createTCPListener(g_infoPort)) == -1) { ERROR_LOG("Cannot create TCP listener socket (info port)"); return -1; } // Adding socket to read set FD_SET(g_infoSock, &g_readSet); g_maxFD = MAX(g_infoSock, g_maxFD); } // If this node is not behind a firewall, it can potentially be a // rendavous point for another node if (g_rendvNode.addr == 0 && g_rendvNode.port == 0) { if ((g_rendvListener = createTCPListener(g_meridPort)) == -1) { perror("Cannot create TCP listener socket (rendavous port)"); } else { FD_SET(g_rendvListener, &g_readSet); g_maxFD = MAX(g_rendvListener, g_maxFD); } } else { g_rendvFD = createRendavousTunnel(g_rendvNode); if (g_rendvFD != -1) { FD_SET(g_rendvFD, &g_readSet); g_maxFD = MAX(g_rendvFD, g_maxFD); } else { ERROR_LOG("FATAL: Cannot create rendavous tunnel\n"); // TODO: Might be a better way to handle this return -1; } } #ifdef PLANET_LAB_SUPPORT if (createICMPSocket() == -1) { ERROR_LOG("Cannot create ICMP socket\n"); return -1; } // TODO: call setuid to not be root anymore // Adding socket to read set FD_SET(g_icmpSock, &g_readSet); g_maxFD = MAX(g_icmpSock, g_maxFD); #endif // Add all seed nodes as ring members (performs probing) for (u_int i = 0; i < g_seedNodes.size(); i++) { NodeIdentRendv tmpNIR = g_seedNodes[i]; if ((tmpNIR.addr != g_localAddr) || (tmpNIR.port != g_meridPort)) { addNodeToRing(tmpNIR); } else { WARN_LOG("Cannot add itself as a seed node\n"); } } g_seedNodes.clear(); // Don't need them anymore // Perform gossip at next gossipInterval SchedGossip gossipCallBack(this); QueryScheduler* gossipScheduler = new QueryScheduler( g_initGossipInterval_s * 1000, g_numInitIntervalRemain, g_ssGossipInterval_s * 1000, this, &gossipCallBack); if (g_queryTable.insertNewQuery(gossipScheduler) == -1) { ERROR_LOG("Cannot add gossip scheduler\n"); delete gossipScheduler; } else { gossipScheduler->init(); } // Perform ring replacement at next ring replacement interval SchedRingManage ringCallBack(this); QueryScheduler* ringScheduler = new QueryScheduler(0, 0, g_replaceInterval_s * 1000, this, &ringCallBack); if (g_queryTable.insertNewQuery(ringScheduler) == -1) { ERROR_LOG("Cannot add ring management scheduler\n"); delete ringScheduler; } else { ringScheduler->init(); } // Declaring structures that will be reused over and over fd_set currentReadSet, currentWriteSet; struct timeval curTime; struct timeval nextEventTime; struct timeval timeOutTV; // Main event driven select loop while (true) { // Set timeout gettimeofday(&curTime, NULL); g_queryTable.nextTimeout(&nextEventTime); // Set time out length if (timeoutLength(&curTime, &nextEventTime, &timeOutTV) == -1) { evaluateTimeout(); // Already expired continue; // Loop again } // Reset fd_set values memcpy(¤tReadSet, &g_readSet, sizeof(fd_set)); memcpy(¤tWriteSet, &g_writeSet, sizeof(fd_set)); int selectRet = select(g_maxFD+1, ¤tReadSet, ¤tWriteSet, NULL, &timeOutTV); if (selectRet == -1) { if (errno == EINTR) { continue; // Interrupted by signal, retry } ERROR_LOG("Select returned an error\n"); return -1; // Return with error } else if (selectRet == 0) { evaluateTimeout(); continue; } if (FD_ISSET(g_stopFD, ¤tReadSet)) { ERROR_LOG("Received stop request\n"); // Don't even bother reading it break; } if (FD_ISSET(g_meridSock, ¤tReadSet)) { readPacket(); } if (FD_ISSET(g_meridSock, ¤tWriteSet)) { writePending(); }#ifdef PLANET_LAB_SUPPORT if (FD_ISSET(g_icmpSock, ¤tReadSet)) { WARN_LOG("ICMP Read pending!!!!\n"); readICMPPacket(); } if (FD_ISSET(g_icmpSock, ¤tWriteSet)) { icmpWritePending(); }#endif #ifdef MERIDIAN_DSL if (FD_ISSET(g_dummySock, ¤tWriteSet)) { //vector<list<uint64_t>::iterator> delete_vect; vector<list<uint64_t>::iterator> clear_vect; list<uint64_t>::iterator psIt = g_psList.begin(); vector<NodeIdentLat> dummyVect; #define MAX_THREADS_PER_ITERATION 5 for (int itCount = 0; psIt != g_psList.end() && itCount < MAX_THREADS_PER_ITERATION; psIt++, itCount++) { uint64_t curQueryID = *psIt; clear_vect.push_back(psIt); if (getQueryTable()->isQueryInTable(curQueryID)) { const DSLRecvQuery* thisQ = getQueryTable()->getDSLRecvQ(curQueryID); // If thisQ is NULL (error due to cast?), remove from list if (thisQ == NULL) { fprintf(stderr, "g_psList contains a non-DSLRecvQuery query\n"); continue; } // If thread is blocked, remove from g_psList if (thisQ->parserState() == PS_BLOCKED) { //printf("Thread is blocked, skip\n"); continue; } getQueryTable()->notifyQLatency(curQueryID, dummyVect); // If thread no longer active, remove from g_psList if (getQueryTable()->isQueryInTable(curQueryID)) { g_psList.push_back(curQueryID); } } } // Just remove it from the list, as we moved // it to another position for (u_int i = 0; i < clear_vect.size(); i++) { g_psList.erase(clear_vect[i]); } // Turn off trigger if no process need to be executed if (g_psList.empty()) { FD_CLR(g_dummySock, &g_writeSet); } }#endif if (handleRendavous(¤tReadSet, ¤tWriteSet) == -1) { break; // Connection to rendavous node broken } handleInfoConnections(¤tReadSet, ¤tWriteSet); handleTCPConnections(¤tReadSet, ¤tWriteSet); handleDNSConnections(¤tReadSet, ¤tWriteSet); } return 0;}int MeridianProcess::handleTCPConnections( fd_set* curReadSet, fd_set* curWriteSet) { // Handle existing requests vector<map<int, pair<uint64_t, NodeIdent>*>::iterator> deleteVector; map<int, pair<uint64_t, NodeIdent>*>::iterator conIt = g_tcpProbeConnections.begin(); for (; conIt != g_tcpProbeConnections.end(); conIt++) { if (FD_ISSET(conIt->first, curWriteSet)) { struct sockaddr peerAddr; socklen_t peerLen = sizeof(struct sockaddr); if (getpeername(conIt->first, &peerAddr, &peerLen) != -1){ pair<uint64_t, NodeIdent>* thisPair = conIt->second; // Pass back latency of 0, as the timing is done // within the query, not in the TCP connection NodeIdentLat outNIL = {(thisPair->second).addr, (thisPair->second).port, 0}; vector<NodeIdentLat> subVect; subVect.push_back(outNIL); g_queryTable.notifyQLatency(thisPair->first, subVect); } // TODO: Add notifyError for quicker notification of error deleteVector.push_back(conIt); // We can delete this now } } // Cleanup all finished connections for (u_int i = 0; i < deleteVector.size(); i++) { eraseTCPConnection(deleteVector[i]); } return 0;}int MeridianProcess::handleDNSConnections( fd_set* curReadSet, fd_set* curWriteSet) { // Handle existing requests vector<map<int, pair<uint64_t, NodeIdent>*>::iterator> deleteVector; map<int, pair<uint64_t, NodeIdent>*>::iterator conIt = g_dnsProbeConnections.begin(); for (; conIt != g_dnsProbeConnections.end(); conIt++) { if (FD_ISSET(conIt->first, curReadSet)) { WARN_LOG("Response from DNS server\n"); pair<uint64_t, NodeIdent>* thisPair = conIt->second; // Pass back latency of 0, as the timing is done // within the query, not in the DNS connection NodeIdentLat outNIL = {(thisPair->second).addr, (thisPair->second).port, 0}; vector<NodeIdentLat> subVect; subVect.push_back(outNIL); g_queryTable.notifyQLatency(thisPair->first, subVect); deleteVector.push_back(conIt); // We can delete this now } } // Cleanup all finished connections for (u_int i = 0; i < deleteVector.size(); i++) { eraseDNSConnection(deleteVector[i]); } return 0;}int MeridianProcess::removeRendavousConnection( map<NodeIdent, int, ltNodeIdent>::iterator& in_it) { int oldSock = in_it->second; g_rendvConnections.erase(in_it); map<int, list<RealPacket*>*>::iterator queueIt = g_rendvQueue.find(oldSock); assert(queueIt != g_rendvQueue.end()); list<RealPacket*>* packetList = queueIt->second; // Remove from rendvQueue g_rendvQueue.erase(queueIt); // Delete all RealPackets in queue list<RealPacket*>::iterator listIt = packetList->begin(); for (; listIt != packetList->end(); listIt++) { delete *listIt; } delete packetList; // Delete the queue itself // Make sure we FD_CLR and close the socket FD_CLR(oldSock, &g_writeSet);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -