📄 meridianprocess.cpp
字号:
/******************************************************************************Meridian prototype distributionCopyright (C) 2005 Bernard WongThis program is free software; you can redistribute it and/ormodify it under the terms of the GNU General Public Licenseas published by the Free Software Foundation; either version 2of the License, or (at your option) any later version.This program is distributed in the hope that it will be useful,but WITHOUT ANY WARRANTY; without even the implied warranty ofMERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See theGNU General Public License for more details.You should have received a copy of the GNU General Public Licensealong with this program; if not, write to the Free SoftwareFoundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.The copyright owner can be contacted by e-mail at bwong@cs.cornell.edu*******************************************************************************/using namespace std;#include <stdlib.h>#include <stdio.h>#include <errno.h>#include <netdb.h>#include <sys/types.h>#include <sys/socket.h>#include <sys/param.h>#include <unistd.h>#include <fcntl.h>#include <string.h>#include <netinet/in.h>#include <arpa/inet.h>#include <netinet/tcp.h>#include <limits.h>#include <resolv.h>#include <signal.h>#include "Marshal.h"#include "MeridianProcess.h" int MeridianProcess::createRendavousTunnel(const NodeIdent& rendvNode) { int sock = socket(AF_INET, SOCK_STREAM, 0); if (sock == -1) { ERROR_LOG("Cannot create sock\n"); return -1; } int opt = 1; if(setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &opt, sizeof(opt)) < 0) { ERROR_LOG("setsockopt reuseaddr failed\n"); close(sock); return -1; } // Set socket options that are necessary for rate limiting opt = 1; if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(int)) == -1) { //perror("setsockopt"); ERROR_LOG("Cannot set TCP_NODELAY\n"); close(sock); return -1; } struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons(g_meridPort); addr.sin_addr.s_addr = INADDR_ANY; memset(&(addr.sin_zero), '\0', 8); if (bind(sock, (struct sockaddr *)&addr, sizeof(struct sockaddr)) == -1 || setNonBlock(sock) == -1) { ERROR_LOG("Cannot bind or set non block\n"); close(sock); return -1; } if (performConnect(sock, rendvNode.addr, rendvNode.port) == -1) { ERROR_LOG("Perform connect returns -1\n"); close(sock); return -1; } // Set the socket to be writeable fd_set curWriteSet, writeSet; FD_ZERO(&writeSet); FD_SET(sock, &writeSet); // Set up the current time and the future timeout time struct timeval curTime, maxTime, timeout; gettimeofday(&maxTime, NULL); maxTime.tv_sec += DEFAULT_TIME_OUT_S; while (true) { memcpy(&curWriteSet, &writeSet, sizeof(fd_set)); gettimeofday(&curTime, NULL); if (timeoutLength(&curTime, &maxTime, &timeout) == -1) { WARN_LOG("Timeout out in connecting to rendavous\n"); break; } int selectRet = select(sock+1, NULL, &curWriteSet, NULL, &timeout); if (selectRet == -1) { if (errno == EINTR) { continue; // Interrupted by signal, retry } WARN_LOG("Select returned error in rendavous\n"); break; } else if (selectRet == 0) { continue; // Let the timeoutLength part check the time } else if (FD_ISSET(sock, &curWriteSet)) { struct sockaddr peerAddr; socklen_t peerLen = sizeof(struct sockaddr); if (getpeername(sock, &peerAddr, &peerLen) != -1){ return sock; // Connection completed successfully } WARN_LOG("Call to getpeername fails\n"); } break; // Would have called continue if we wanted to loop } // If we had to break out of the loop, then the connect failed, // or timed out. Close and return -1 close(sock); return -1;}int MeridianProcess::setNonBlock(int fd) { int sockflags; if ((sockflags = fcntl(fd, F_GETFL, 0)) != -1){ sockflags |= O_NONBLOCK; return fcntl(fd, F_SETFL, sockflags); } return -1;} // Creates a TCP listener socket int MeridianProcess::createTCPListener(u_short port) { int sock = socket(AF_INET, SOCK_STREAM, 0); int opt = 1; if(setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &opt, sizeof(opt)) < 0) { perror("setsockopt reuseaddr failed\n"); close(sock); return -1; } // Set socket options that are necessary for rate limiting opt = 1; if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(int)) == -1) { perror("setsockopt"); close(sock); return -1; } struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = INADDR_ANY; memset(&(addr.sin_zero), '\0', 8); if (bind(sock, (struct sockaddr *)&addr, sizeof(struct sockaddr)) == -1 || listen(sock, TCP_LISTEN_BACKLOG) == -1 || setNonBlock(sock) == -1) { close(sock); return -1; } return sock;}int MeridianProcess::performConnect( int sendSock, uint32_t addr, uint16_t port) { struct sockaddr_in hostAddr; hostAddr.sin_family = AF_INET; hostAddr.sin_port = htons(port); hostAddr.sin_addr.s_addr = htonl(addr); memset(&(hostAddr.sin_zero), '\0', 8); if (connect(sendSock, (struct sockaddr*)&hostAddr, sizeof(struct sockaddr)) == -1 && errno != EINPROGRESS) { return -1; } return 0;}int MeridianProcess::timeoutLength(struct timeval* curTime, struct timeval* nextEventTime, struct timeval* timeOutTV) { QueryTable::normalizeTime(*curTime); QueryTable::normalizeTime(*nextEventTime); if (curTime->tv_sec > nextEventTime->tv_sec || (curTime->tv_sec == nextEventTime->tv_sec && curTime->tv_usec >= nextEventTime->tv_usec)) { return -1; // Expired } timersub(nextEventTime, curTime, timeOutTV); return 0;}int MeridianProcess::eraseTCPConnection( const map<int, pair<uint64_t, NodeIdent>*>::iterator& conIt) { int tmpSock = conIt->first; pair<uint64_t, NodeIdent>* curPair = conIt->second; g_tcpProbeConnections.erase(conIt); FD_CLR(tmpSock, &g_writeSet); close(tmpSock); delete curPair; return 0;}int MeridianProcess::eraseDNSConnection( const map<int, pair<uint64_t, NodeIdent>*>::iterator& conIt) { int tmpSock = conIt->first; pair<uint64_t, NodeIdent>* curPair = conIt->second; g_dnsProbeConnections.erase(conIt); FD_CLR(tmpSock, &g_readSet); // Note that we are clearing readSet close(tmpSock); delete curPair; return 0;} int MeridianProcess::eraseTCPConnection(int in_sock) { WARN_LOG("Entering eraseTCPConnection\n"); map<int, pair<uint64_t, NodeIdent>*>::iterator it = g_tcpProbeConnections.find(in_sock); if (it != g_tcpProbeConnections.end()) { WARN_LOG("Found something to erase\n"); return eraseTCPConnection(it); } // It might have been deleted already (tcp connect failed), so if // it is not in the map, don't do anything return 0;}int MeridianProcess::eraseDNSConnection(int in_sock) { WARN_LOG("Entering eraseDNSConnection\n"); map<int, pair<uint64_t, NodeIdent>*>::iterator it = g_dnsProbeConnections.find(in_sock); if (it != g_dnsProbeConnections.end()) { WARN_LOG("Found something to erase\n"); return eraseDNSConnection(it); } return 0;}MeridianProcess::MeridianProcess(u_short meridian_port, u_short info_port, u_int prim_size, u_int second_size, int ring_base, int stopFD) : g_meridPort(meridian_port), g_infoPort(info_port), g_meridSock(-1), g_infoSock(-1), g_rendvFD(-1), g_rendvListener(-1), g_maxFD(0), g_stopFD(stopFD)#ifdef MERIDIAN_DSL , g_dummySock(-1), g_max_ttl(DEFAULT_MAX_TTL)#endif#ifdef PLANET_LAB_SUPPORT , g_icmpSock(-1)#endif { // Packet used as a temp buffer to hold rendavous client traffic NodeIdent dummy = {0, 0}; g_rendvRecvPacket = new RealPacket(dummy); setRendavousNode(0, 0); FD_ZERO(&g_readSet); FD_ZERO(&g_writeSet); setGossipInterval(10, 10, 60); setReplaceInterval(300); g_rings = new RingSet(prim_size, second_size, ring_base); g_tcpCache = new LatencyCache(PROBE_CACHE_SIZE, PROBE_CACHE_TIMEOUT_US); g_dnsCache = new LatencyCache(PROBE_CACHE_SIZE, PROBE_CACHE_TIMEOUT_US); g_pingCache = new LatencyCache(PROBE_CACHE_SIZE, PROBE_CACHE_TIMEOUT_US);#ifdef PLANET_LAB_SUPPORT g_icmpCache = new LatencyCache(PROBE_CACHE_SIZE, PROBE_CACHE_TIMEOUT_US);#endif} MeridianProcess::~MeridianProcess() { // Delete caches if (g_tcpCache) { delete g_tcpCache; } if (g_dnsCache) { delete g_dnsCache; } if (g_pingCache) { delete g_pingCache; } // Delete ring set if (g_rings) { delete g_rings; } // Cleanup sockets if (g_meridSock != -1) { close(g_meridSock); } if (g_infoSock != -1) { close(g_infoSock); } if (g_rendvFD != -1) { close(g_rendvFD); } if (g_rendvListener != -1) { close(g_rendvListener); } if (g_stopFD != -1) { close(g_stopFD); } if (g_rendvRecvPacket) { delete g_rendvRecvPacket; } list<RealPacket*>::iterator packetIt = g_outPacketList.begin(); for (; packetIt != g_outPacketList.end(); packetIt++) { if (*packetIt) { delete *packetIt; } } list<pair<int, RealPacket*>*>::iterator infoIt = g_infoConnects.begin(); for (; infoIt != g_infoConnects.end(); infoIt++) { pair<int, RealPacket*>* curPair = *infoIt; if (curPair) { if (curPair->second) { delete curPair->second; } delete curPair; } } // Close all TCP connections and free structures map<int, pair<uint64_t, NodeIdent>*>::iterator mapIt = g_tcpProbeConnections.begin(); for (; mapIt != g_tcpProbeConnections.begin(); mapIt++) { if (mapIt->first != -1) { close(mapIt->first); } if (mapIt->second != NULL) { delete mapIt->second; } } // Close all DNS connections and free structures mapIt = g_dnsProbeConnections.begin(); for (; mapIt != g_dnsProbeConnections.begin(); mapIt++) { if (mapIt->first != -1) { close(mapIt->first); } if (mapIt->second != NULL) { delete mapIt->second; } } // Clean up all rendavous connections map<int, list<RealPacket*>*>::iterator rendvQIt = g_rendvQueue.begin(); for (; rendvQIt != g_rendvQueue.end(); rendvQIt++) { if (rendvQIt->second != NULL) { list<RealPacket*>::iterator listQIt = rendvQIt->second->begin(); for (; listQIt != rendvQIt->second->end(); listQIt++) { if (*listQIt) { delete *listQIt; } } delete rendvQIt->second; } if (rendvQIt->first != -1) { close(rendvQIt->first); } }#ifdef MERIDIAN_DSL if (g_dummySock != -1) { close(g_dummySock); }#endif#ifdef PLANET_LAB_SUPPORT if (g_icmpSock != -1) { close(g_icmpSock); } #endif }int MeridianProcess::evaluateTimeout() { WARN_LOG("TIMEOUT Encountered\n"); return g_queryTable.handleTimeout();}uint64_t MeridianProcess::getNewQueryID() { uint64_t retVal; for (u_int i = 0; i < USHRT_MAX; i++) { u_short randVal = rand() % USHRT_MAX; // Concat with port u_int secondParam = g_meridPort; secondParam = secondParam << 16; secondParam |= randVal; // Concat with address retVal = Packet::to64(g_localAddr, secondParam); // Check to see if it is used if (!(g_queryTable.isQueryInTable(retVal))) { return retVal; } } // The system seem to be processing near capcity, let's create a // random 64 value for a query id instead, which is unluckly to collide retVal = Packet::to64(rand(), rand()); return retVal;} int MeridianProcess::addNodeToRing(const NodeIdentRendv& in_remote) { // To avoid rapid pinging of a node, we skip ping nodes that // have been recently pinged uint32_t curLatencyUS; NodeIdent curIdent = {in_remote.addr, in_remote.port}; if (pingCacheGetLatency(curIdent, &curLatencyUS) != -1) { WARN_LOG("Skip pinging of recently pinged node\n"); return 0; } // Create query to encapsulate the state AddNodeQuery* newQuery = new AddNodeQuery(in_remote, this); if (g_queryTable.insertNewQuery(newQuery) == -1) { delete newQuery; return -1; } newQuery->init(); return 0;}int MeridianProcess::performGossip() { vector<NodeIdentRendv> randNodes; g_rings->getRandomNodes(randNodes); for (u_int i = 0; i < randNodes.size(); i++) { //NodeIdent remoteNode = {randNodes[i].addr, randNodes[i].port}; //GossipQuery* newQuery = new GossipQuery(remoteNode, this); GossipQuery* newQuery = new GossipQuery(randNodes[i], this); if (g_queryTable.insertNewQuery(newQuery) == -1) { delete newQuery; return -1;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -