📄 meridianprocess.cpp
字号:
close(oldSock); return 0;}int MeridianProcess::handleRendavous( fd_set* curReadSet, fd_set* curWriteSet) { // If we are behind a firewall, check to see if any new data has been // pushed via the rendavous tunnel if (g_rendvFD != -1) { if (FD_ISSET(g_rendvFD, curReadSet)) { int recvRet = recv(g_rendvFD, g_rendvRecvPacket->getPayLoad() + g_rendvRecvPacket->getPayLoadSize(), g_rendvRecvPacket->getPacketSize() - g_rendvRecvPacket->getPayLoadSize(), 0); if (recvRet == -1 || recvRet == 0) { // Error reading ERROR_LOG("Rendavous host has disconnected\n"); // TODO: Need to find another host FD_CLR(g_rendvFD, &g_readSet); close(g_rendvFD); g_rendvFD = -1; return -1; } else { // Update payload size; g_rendvRecvPacket->setPayLoadSize( g_rendvRecvPacket->getPayLoadSize() + recvRet); // See if we have complete PULL packet. If we do, read // it and extract the inner packet. The rendv packet must also // be updated (portion of next packet must be moved to front // of buffer NodeIdent srcNode; RealPacket* newPacket = PullPacket::parse(*g_rendvRecvPacket, srcNode); if (newPacket) { handleNewPacket(newPacket->getPayLoad(), newPacket->getPayLoadSize(), srcNode); delete newPacket; } } } return 0; } // If we are a host to rendavous nodes // Check for new requests. This must happen before checking existing // connections, as we may modify the g_rendvConnections data-structure if (FD_ISSET(g_rendvListener, curReadSet)) { struct sockaddr_in tmpAddr; int sinSize = sizeof(struct sockaddr_in); int newRendvSock = accept(g_rendvListener, (struct sockaddr*)&tmpAddr, (socklen_t*)&sinSize); if (newRendvSock != -1) { NodeIdent remoteNode = {ntohl(tmpAddr.sin_addr.s_addr), ntohs(tmpAddr.sin_port) }; map<NodeIdent, int, ltNodeIdent>::iterator findRendvIt = g_rendvConnections.find(remoteNode); if (findRendvIt != g_rendvConnections.end()) { ERROR_LOG("Rendavous connection already exists\n"); ERROR_LOG("Closing existing connection\n"); removeRendavousConnection(findRendvIt); } #ifdef DEBUG u_int netAddr = htonl(remoteNode.addr); char* ringNodeStr = inet_ntoa(*(struct in_addr*)&(netAddr)); WARN_LOG_2("Adding connection for %s:%d\n", ringNodeStr, remoteNode.port);#endif g_rendvConnections[remoteNode] = newRendvSock; g_rendvQueue[newRendvSock] = new list<RealPacket*>(); } } // Handle existing connections vector<map<NodeIdent, int, ltNodeIdent>::iterator> deleteVector; map<NodeIdent, int, ltNodeIdent>::iterator it = g_rendvConnections.begin(); for (; it != g_rendvConnections.end(); it++) { if (FD_ISSET(it->second, curWriteSet)) { //WARN_LOG("Ready to write through tunnel\n"); map<int, list<RealPacket*>*>::iterator queueIt = g_rendvQueue.find(it->second); assert(queueIt != g_rendvQueue.end()); list<RealPacket*>* packetList = queueIt->second; RealPacket* curPacket = packetList->front(); assert(curPacket != NULL); int sendRet = send(it->second, curPacket->getPayLoad() + curPacket->getPos(), curPacket->getPayLoadSize() - curPacket->getPos(), 0); if (sendRet == -1 || sendRet == 0) { ERROR_LOG("Connection to rendvaous client closed\n"); // Connection closed deleteVector.push_back(it); } else if (sendRet == (curPacket->getPayLoadSize() - curPacket->getPos())) { packetList->pop_front(); if (packetList->empty()) { FD_CLR(it->second, &g_writeSet); // Turn off writeable } delete curPacket; } else { curPacket->incrPos(sendRet); } } } // All dead connections are cleaned up here for (u_int i = 0; i < deleteVector.size(); i++) { removeRendavousConnection(deleteVector[i]); } return 0;}int MeridianProcess::addTCPConnection( uint64_t in_qid, const NodeIdent& in_remoteNode) { int newSock = socket(AF_INET, SOCK_STREAM, 0); if (newSock == -1) { return -1; } if (setNonBlock(newSock) == -1) { close(newSock); return -1; } if (performConnect(newSock, in_remoteNode.addr, in_remoteNode.port) == -1) { close(newSock); return -1; } pair<uint64_t, NodeIdent>* tmp = new pair<uint64_t, NodeIdent>(in_qid, in_remoteNode); if (tmp == NULL) { close(newSock); return -1; } map<int, pair<uint64_t, NodeIdent>*>::iterator it = g_tcpProbeConnections.find(newSock); if (it != g_tcpProbeConnections.end()) { assert(false); } g_tcpProbeConnections[newSock] = tmp; FD_SET(newSock, &g_writeSet); g_maxFD = MAX(newSock, g_maxFD); return newSock;}int MeridianProcess::addDNSConnection( uint64_t in_qid, const NodeIdent& in_remoteNode) { int newSock = socket(AF_INET, SOCK_DGRAM, 0); if (newSock == -1) { return -1; } if (setNonBlock(newSock) == -1) { close(newSock); return -1; } // Send DNS query for localhost RealPacket* newPacket = new RealPacket(in_remoteNode); if (newPacket == NULL) { close(newSock); return -1; } int packetSize = res_mkquery(QUERY, "localhost", C_IN, T_A, NULL, 0, 0, (u_char*)(newPacket->getPayLoad()), newPacket->getPacketSize()); WARN_LOG_1("DNS packet size is %d\n", packetSize); if (packetSize == -1) { close(newSock); delete newPacket; return -1; } newPacket->setPayLoadSize(packetSize); pair<uint64_t, NodeIdent>* tmp = new pair<uint64_t, NodeIdent>(in_qid, in_remoteNode); if (tmp == NULL) { close(newSock); delete newPacket; return -1; } // Perform actual send of the packet if (performSend(newSock, newPacket) == -1) { close(newSock); delete newPacket; return -1; } delete newPacket; // No longer needed map<int, pair<uint64_t, NodeIdent>*>::iterator it = g_dnsProbeConnections.find(newSock); if (it != g_dnsProbeConnections.end()) { assert(false); } // Add connection to DNS map g_dnsProbeConnections[newSock] = tmp; // Set socket to select loop, wait for response FD_SET(newSock, &g_readSet); g_maxFD = MAX(newSock, g_maxFD); return newSock;}#ifdef PLANET_LAB_SUPPORT#include <netinet/ip_icmp.h>// Taken from "Safe Planetlab Raw Sockets"u_short MeridianProcess::in_cksum( const u_short *addr, register int len, u_short csum) { register int nleft = len; const u_short *w = addr; register u_short answer; register int sum = csum; /** Our algorithm is simple, using a 32 bit accumulator (sum), * we add sequential 16 bit words to it, and at the end, fold * back all the carry bits from the top 16 bits into the lower * 16 bits.*/ while (nleft > 1) { sum += *w++; nleft -= 2; } /* mop up an odd byte, if necessary */ if (nleft == 1) sum += htons(*(u_char *)w << 8); /** add back carry outs from top 16 bits to low 16 bits*/ sum = (sum >> 16) + (sum & 0xffff); /* add hi 16 to low 16 */ sum += (sum >> 16); /* add carry */ answer = ~sum; /* truncate to 16 bits */ return (answer);}int MeridianProcess::createICMPSocket() { // Does the local port have to be coupled to the ICMP id? g_icmpSock = socket(PF_INET, SOCK_RAW, IPPROTO_ICMP); if (g_icmpSock == -1) { ERROR_LOG("Error creating ICMP socket\n"); return -1; } if (setuid(getuid()) == -1) { ERROR_LOG("Cannot lower privilege, exiting\n"); close(g_icmpSock); return -1; } if (setNonBlock(g_icmpSock) == -1) { ERROR_LOG("Error setting the ICMP socket to be non-blocking\n"); close(g_icmpSock); return -1; } // Use the same port number for ICMP as the Meridian port g_icmpPort = g_meridPort; //g_icmpPort = (rand() % ((1 << 16) - 1024)) + 1024; struct sockaddr_in sin; memset(&sin, 0, sizeof(sin)); sin.sin_port = htons(g_icmpPort); if (bind(g_icmpSock, (struct sockaddr*)&sin, sizeof(sin)) == -1) { ERROR_LOG("Cannot bind to ICMP socket\n"); close(g_icmpSock); return -1; } int tmpOpt = 1; // Flag set to true if (setsockopt(g_icmpSock, 0, IP_HDRINCL, &tmpOpt, sizeof(tmpOpt)) == -1) { ERROR_LOG("Cannot setsockopt on ICMP socket\n"); close(g_icmpSock); return -1; } return 0;}int MeridianProcess::sendICMPProbe( uint64_t in_qid, uint32_t in_remoteNode) { // Set the remote port to be g_icmpPort for ICMP packets NodeIdent tmpRemote = {in_remoteNode, g_icmpPort}; // Payload is the query id uint16_t icmpPacketSize = sizeof(struct iphdr) + sizeof(struct icmphdr) + sizeof(uint64_t); // Send ICMP query for localhost RealPacket* newPacket = new RealPacket(tmpRemote, icmpPacketSize); if (newPacket == NULL) { ERROR_LOG("Cannot create new packet in sendICMPProbe\n"); return -1; } // Payload size the same size as packet size newPacket->setPayLoadSize(icmpPacketSize); // Clear packet first (may not be necessary actually) memset(newPacket->getPayLoad(), '\0', newPacket->getPacketSize()); // Set IP fields, lots of magic numbers (taken from Bickson docs) struct iphdr* ip_header = (struct iphdr*) newPacket->getPayLoad(); ip_header->ihl = 5; ip_header->version = 4; ip_header->tos = 0; ip_header->tot_len = htons(icmpPacketSize); ip_header->id = rand(); ip_header->ttl = 64; ip_header->frag_off = 0x40; ip_header->protocol = IPPROTO_ICMP; ip_header->check = 0; // Set by the kernel ip_header->daddr = htonl(in_remoteNode); ip_header->saddr = 0; // Source addr blank, set by kernel // Set ICMP fields struct icmphdr* icmp_header = (struct icmphdr*)(newPacket->getPayLoad() + sizeof(struct iphdr)); icmp_header->type = ICMP_ECHO; icmp_header->code = 0; icmp_header->un.echo.id = htons(g_icmpPort); icmp_header->un.echo.sequence = htons(g_icmpSeq++); // Set qid into network order before adding to packet uint32_t qid_1, qid_2; Packet::to32(in_qid, &qid_1, &qid_2); qid_1 = htonl(qid_1); // Write the top 32bits memcpy(newPacket->getPayLoad() + sizeof(struct iphdr) + sizeof(struct icmphdr), &qid_1, sizeof(uint32_t)); qid_2 = htonl(qid_2); // Write the bottom 32bits memcpy(newPacket->getPayLoad() + sizeof(struct iphdr) + sizeof(struct icmphdr) + sizeof(uint32_t), &qid_2, sizeof(uint32_t)); // Calculate and write ICMP checksum icmp_header->checksum = in_cksum((const uint16_t*)icmp_header, sizeof(struct icmphdr) + sizeof(uint64_t), 0); // Push to ICMP waiting queue addICMPOutPacket(newPacket); return 0; }int MeridianProcess::addICMPOutPacket(RealPacket* in_packet) { g_icmpOutPacketList.push_back(in_packet); FD_SET(g_icmpSock, &g_writeSet); g_maxFD = MAX(g_icmpSock, g_maxFD); return 0;}void MeridianProcess::icmpWritePending() { while (true) { assert(!(g_icmpOutPacketList.empty())); RealPacket* firstPacket = g_icmpOutPacketList.front(); if (performSendICMP(g_icmpSock, firstPacket) == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK) { break; // Retry again later when ready to send } else { // Let's just continute still, but remove this packet ERROR_LOG("Error calling send\n"); } } g_icmpOutPacketList.pop_front(); delete firstPacket; // Done with packet if (g_icmpOutPacketList.empty()) { FD_CLR(g_icmpSock, &g_writeSet); break; // No more to send } }}#define MAX_ICMP_PACKET_SIZE 1400 int MeridianProcess::readICMPPacket() { char buf[MAX_ICMP_PACKET_SIZE]; struct sockaddr_in theirAddr; int addrLen = sizeof(struct sockaddr); // Perform actual recv on socket int numBytes = recvfrom(g_icmpSock, buf, MAX_ICMP_PACKET_SIZE, 0, (struct sockaddr*)&theirAddr, (socklen_t*)&addrLen); if (numBytes == -1) { perror("Error on recvfrom"); return -1; } NodeIdent remoteNode = {ntohl(theirAddr.sin_addr.s_addr), 0}; // Get query id from ICMP ECHO reply uint16_t icmpPacketSize = sizeof(struct iphdr) + sizeof(struct icmphdr) + sizeof(uint64_t); if (numBytes != icmpPacketSize) { //ERROR_LOG("Received ICMP packet size incorrect\n"); WARN_LOG("Received unexpected ICMP packet\n"); return -1; } // Check that it is an ICMP packet struct iphdr* ip_header = (struct iphdr*)buf; if (ip_header->protocol != IPPROTO_ICMP) { WARN_LOG("Received non-ICMP packet from ICMP socket\n"); return -1; } struct icmphdr* icmp_header = (struct icmphdr*)(buf + sizeof(struct iphdr)); // Check to see it is a ECHO reply if (icmp_header->type != ICMP_ECHOREPLY) { WARN_LOG("Received non-ICMP reply packet\n"); return -1; } // Check to see that it is expected if (icmp_header->un.echo.id != htons(g_icmpPort)) { ERROR_LOG("Received unexpected ICMP reply\n"); return -1; } // NOTE: Should probably check checksum in the future // Get the qid of the ICMP packet uint32_t qid_1, qid_2; memcpy(&qid_1, buf + sizeof(struct iphdr) + sizeof(struct icmphdr), sizeof(uint32_t)); memcpy(&qid_2, buf + sizeof(struct iphdr) + sizeof(struct icmphdr) + sizeof(uint32_t), sizeof(uint32_t)); uint64_t qid_no = Packet::to64(ntohl(qid_1), ntohl(qid_2)); WARN_LOG_1("Received qid of value %llu\n", qid_no); // Notify the query with this qid of the ICMP packet g_queryTable.notifyQPacket(qid_no, remoteNode, buf, numBytes); return 0;}int MeridianProcess::performSendICMP(int sock, RealPacket* in_packet) {#ifdef DEBUG u_int netAddr = htonl(in_packet->getAddr()); char* ringNodeStr = inet_ntoa(*(struct in_addr*)&(netAddr)); WARN_LOG_3("Sending query to port number %s:%d of size %d\n", ringNodeStr, in_packet->getPort(), in_packet->getPayLoadSize());#endif struct sockaddr_in hostAddr; memset(&(hostAddr), '\0', sizeof(struct sockaddr_in)); hostAddr.sin_family = PF_INET; hostAddr.sin_port = htons(in_packet->getPort()); hostAddr.sin_addr.s_addr = htonl(in_packet->getAddr()); //memset(&(hostAddr.sin_zero), '\0', 8); int sendRet = sendto(sock, in_packet->getPayLoad(), in_packet->getPayLoadSize(), 0, (struct sockaddr*)&hostAddr, sizeof(struct sockaddr)); return sendRet;}#endif
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -