📄 meridianprocess.cpp
字号:
} newQuery->init(); } return 0;}int MeridianProcess::performRingManagement() { // Find all full rings int numRings = g_rings->getNumberOfRings(); vector<int> eligibleRings; for (int i = 0; i < numRings; i++) { // Test if the ring is eligible for ring management if (g_rings->eligibleForReplacement(i)) { eligibleRings.push_back(i); } } if (eligibleRings.empty()) { return 0; } // Pick a random eligible ring int selectedRing = eligibleRings[rand() % eligibleRings.size()]; RingManageQuery* newQuery = new RingManageQuery(selectedRing, this); if (g_queryTable.insertNewQuery(newQuery) == -1) { delete newQuery; return -1; } newQuery->init(); return 0; }int MeridianProcess::addOutPacket(RealPacket* in_packet) { g_outPacketList.push_back(in_packet); FD_SET(g_meridSock, &g_writeSet); g_maxFD = MAX(g_meridSock, g_maxFD); return 0;}void MeridianProcess::writePending() { while (true) { assert(!(g_outPacketList.empty())); RealPacket* firstPacket = g_outPacketList.front(); if (performSend(g_meridSock, firstPacket) == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK) { break; // Retry again later when ready to send } else { // Let's just continute still, but remove this packet ERROR_LOG("Error calling send\n"); } } g_outPacketList.pop_front(); delete firstPacket; // Done with packet if (g_outPacketList.empty()) { FD_CLR(g_meridSock, &g_writeSet); break; // No more to send } }}int MeridianProcess::performSend(int sock, RealPacket* in_packet) {#ifdef DEBUG u_int netAddr = htonl(in_packet->getAddr()); char* ringNodeStr = inet_ntoa(*(struct in_addr*)&(netAddr)); WARN_LOG_3("Sending query to port number %s:%d of size %d\n", ringNodeStr, in_packet->getPort(), in_packet->getPayLoadSize());#endif // Handle firewall host by wrapping it around a PUSH packet if (in_packet->getRendvAddr() != 0 || in_packet->getRendvPort() != 0) {#ifdef DEBUG u_int netAddr = htonl(in_packet->getRendvAddr()); char* ringNodeStr = inet_ntoa(*(struct in_addr*)&(netAddr)); WARN_LOG_2("Redirecting to rendavous node, %s:%d\n", ringNodeStr, in_packet->getRendvPort());#endif // QID for push packet should never be used, just set it to 0 PushPacket pushPacket(0, in_packet->getAddr(), in_packet->getPort()); NodeIdent rendvNode = {in_packet->getRendvAddr(), in_packet->getRendvPort()}; // This packet MUST not have a rendavous host RealPacket tmpPacket(rendvNode); if (pushPacket.createRealPacket(tmpPacket) == -1) { ERROR_LOG("Cannot create PUSH packet\n"); return -1; } tmpPacket.append_packet(*in_packet); if (!(tmpPacket.completeOkay())) { ERROR_LOG("Cannot create PUSH packet\n"); return -1; } return performSend(sock, &tmpPacket); } struct sockaddr_in hostAddr; //memset(&(hostAddr), '\0', sizeof(struct sockaddr_in)); hostAddr.sin_family = AF_INET; hostAddr.sin_port = htons(in_packet->getPort()); hostAddr.sin_addr.s_addr = htonl(in_packet->getAddr()); memset(&(hostAddr.sin_zero), '\0', 8); int sendRet = sendto(sock, in_packet->getPayLoad(), in_packet->getPayLoadSize(), 0, (struct sockaddr*)&hostAddr, sizeof(struct sockaddr)); return sendRet;}int MeridianProcess::readPacket() { char buf[MAX_UDP_PACKET_SIZE]; struct sockaddr_in theirAddr; int addrLen = sizeof(struct sockaddr); // Perform actual recv on socket int numBytes = recvfrom(g_meridSock, buf, MAX_UDP_PACKET_SIZE, 0, (struct sockaddr*)&theirAddr, (socklen_t*)&addrLen); if (numBytes == -1) { perror("Error on recvfrom"); return -1; } NodeIdent remoteNode = {ntohl(theirAddr.sin_addr.s_addr), ntohs(theirAddr.sin_port) }; return handleNewPacket(buf, numBytes, remoteNode);}int MeridianProcess::handleNewPacket( char* buf, int numBytes, const NodeIdent& remoteNode) { BufferWrapper rb(buf, numBytes); char queryType; uint64_t queryID; if (Packet::parseHeader(rb, &queryType, &queryID) != -1) { switch (queryType) { case PUSH: { RealPacket* inPacket = PushPacket::parse(remoteNode, buf, numBytes); NodeIdent destIdent = {inPacket->getAddr(), inPacket->getPort()};#ifdef DEBUG u_int netAddr = htonl(destIdent.addr); char* ringNodeStr = inet_ntoa(*(struct in_addr*)&(netAddr)); WARN_LOG_2("PUSH dest is %s:%d\n", ringNodeStr, destIdent.port);#endif map<NodeIdent, int, ltNodeIdent>::iterator rendvIt = g_rendvConnections.find(destIdent); if (rendvIt == g_rendvConnections.end()) { ERROR_LOG("Node not rendavous for target\n"); delete inPacket; } else { map<int, list<RealPacket*>*>::iterator rendvQIt = g_rendvQueue.find(rendvIt->second); if (rendvQIt == g_rendvQueue.end()) { ERROR_LOG("Inconsistent rendavous state\n"); delete inPacket; // Let's try to fix it anyway g_rendvConnections.erase(rendvIt); } else { // Push it into queue and then on fd in writeSet rendvQIt->second->push_back(inPacket); FD_SET(rendvQIt->first, &g_writeSet); g_maxFD = MAX(rendvQIt->first, g_maxFD); } } } break; case PING: { PongPacket pongPacket(queryID); RealPacket* inPacket = new RealPacket(remoteNode); if (pongPacket.createRealPacket(*inPacket) == -1) { delete inPacket; } else { addOutPacket(inPacket); } } break; case GOSSIP: case GOSSIP_PULL: {#ifdef DEBUG if (queryType == GOSSIP) { WARN_LOG("Received a GOSSIP packet\n"); } else { WARN_LOG("Received a GOSSIP_PULL packet\n"); }#endif GossipPacketGeneric* tmp = NULL; if (queryType == GOSSIP) { tmp = GossipPacketGeneric:: parse<GossipPacketPush>(buf, numBytes); } else { tmp = GossipPacketGeneric:: parse<GossipPacketPull>(buf, numBytes); } if (tmp != NULL) { // Add remote node to ring NodeIdentRendv remoteNodeRendv = { remoteNode.addr, remoteNode.port, tmp->getRendvAddr(), tmp->getRendvPort() };#ifdef DEBUG u_int netAddr = htonl(tmp->getRendvAddr()); char* ringNodeStr = inet_ntoa(*(struct in_addr*)&(netAddr)); WARN_LOG_2("Rendv in GOSSIP packet is %s:%d\n", ringNodeStr, tmp->getRendvPort());#endif addNodeToRing(remoteNodeRendv); // Add nodes in gossip packet to ring const vector<NodeIdentRendv>* tmpVect = tmp->returnTargets(); for (u_int i = 0; i < tmpVect->size(); i++) { //NodeIdentRendv tmpRendv = (*tmpVect)[i]; addNodeToRing((*tmpVect)[i]); }#ifdef GOSSIP_PUSHPULL if (queryType == GOSSIP) { GossipPacketPull gPacket(getNewQueryID(), g_rendvNode.addr, g_rendvNode.port); if (GossipQuery::fillGossipPacket(gPacket, remoteNodeRendv, this) == 0) { WARN_LOG("Creating GOSSIP_PULL ###########\n"); RealPacket* inPacket = new RealPacket(remoteNodeRendv); if (gPacket.createRealPacket(*inPacket) == -1) { delete inPacket; } else { addOutPacket(inPacket); } } }#endif delete tmp; // Finished with gossip packet } } break;#ifdef PLANET_LAB_SUPPORT case REQ_CONSTRAINT_N_ICMP: { handleMCReq<ReqConstraintICMP, HandleMCICMP>( queryID, remoteNode, buf, numBytes); } break;#endif case REQ_CONSTRAINT_N_PING: { handleMCReq<ReqConstraintPing, HandleMCPing>( queryID, remoteNode, buf, numBytes); } break; case REQ_CONSTRAINT_N_DNS: { handleMCReq<ReqConstraintDNS, HandleMCDNS>( queryID, remoteNode, buf, numBytes); } break; case REQ_CONSTRAINT_N_TCP: { handleMCReq<ReqConstraintTCP, HandleMCTCP>( queryID, remoteNode, buf, numBytes); } break;#ifdef PLANET_LAB_SUPPORT case REQ_CLOSEST_N_ICMP: { handleClosestReq<ReqClosestICMP, HandleClosestICMP>( queryID, remoteNode, buf, numBytes); } break;#endif case REQ_CLOSEST_N_MERID_PING: { handleClosestReq<ReqClosestMeridPing, HandleClosestPing>( queryID, remoteNode, buf, numBytes); } break; case REQ_CLOSEST_N_DNS: { handleClosestReq<ReqClosestDNS, HandleClosestDNS>( queryID, remoteNode, buf, numBytes); } break; case REQ_CLOSEST_N_TCP: { handleClosestReq<ReqClosestTCP, HandleClosestTCP>( queryID, remoteNode, buf, numBytes); } break; case RET_RESPONSE: { WARN_LOG("Received a RET_RESPONSE packet\n"); g_queryTable.notifyQPacket( queryID, remoteNode, buf, numBytes); } break; case RET_INFO: { WARN_LOG("Received a RET_INFO packet\n"); g_queryTable.notifyQPacket( queryID, remoteNode, buf, numBytes); } break; case RET_ERROR: { WARN_LOG("Received a RET_ERROR packet\n"); g_queryTable.notifyQPacket( queryID, remoteNode, buf, numBytes); } break; case PONG: { WARN_LOG("Received PONG packet\n"); g_queryTable.notifyQPacket( queryID, remoteNode, buf, numBytes); } break; case REQ_MEASURE_N_MERID_PING: { WARN_LOG("Received ReqMeasurePing packet\n"); handleMeasureReq<ReqMeasurePing, HandleReqPing>( remoteNode, buf, numBytes); } break; case REQ_MEASURE_N_TCP: { WARN_LOG("Received ReqMeasureTCP packet\n"); handleMeasureReq<ReqMeasureTCP, HandleReqTCP>( remoteNode, buf, numBytes); } break; case REQ_MEASURE_N_DNS: { WARN_LOG("Received ReqMeasureDNS packet\n"); handleMeasureReq<ReqMeasureDNS, HandleReqDNS>( remoteNode, buf, numBytes); } break;#ifdef PLANET_LAB_SUPPORT case REQ_MEASURE_N_ICMP: { WARN_LOG("Received ReqMeasureICMP packet\n"); handleMeasureReq<ReqMeasureICMP, HandleReqICMP>( remoteNode, buf, numBytes); } break;#endif case RET_PING_REQ: { WARN_LOG("Received a RET_PING\n"); g_queryTable.notifyQPacket( queryID, remoteNode, buf, numBytes); } break;#ifdef MERIDIAN_DSL case DSL_REPLY: { g_queryTable.notifyQPacket( queryID, remoteNode, buf, numBytes); } break; case DSL_REQUEST: { ParserState* new_state = new ParserState(); if (new_state == NULL) { break; // Cannot create new parser state } DSLRequestPacket* inPacket = DSLRequestPacket::parse(new_state, buf, numBytes); if (inPacket == NULL) { delete new_state; break; // Cannot parse packet } // Get necessary data from packet NodeIdentRendv remoteNodeRendv = { remoteNode.addr, remoteNode.port, inPacket->getRendvAddr(), inPacket->getRendvPort() }; uint64_t prevID = inPacket->retReqID(); uint16_t q_timeout = inPacket->timeout_ms(); uint16_t q_ttl = inPacket->getTTL(); //printf("Timeout of %d ms\n", q_timeout); delete inPacket; // No longer needed // Drop packets that have too large TTLs if (q_ttl > g_max_ttl) { delete new_state; break; } g_parser_line = 1; // Reset line count for parser if (yyparse((void*)new_state) == -1) { delete new_state; break; // Parse error } if (new_state->save_context() == -1) { delete new_state; break; // Error saving context } makecontext(new_state->get_context(), (void (*)())(&jmp_eval), 1, new_state); DSLRecvQuery* newQ = new DSLRecvQuery(new_state, this, remoteNodeRendv, prevID, q_timeout, q_ttl); if (newQ == NULL) { delete new_state; break; // Error creating new Query } // Associate parser state with query id new_state->setQuery(newQ); // Associate parser with this meridian process new_state->setMeridProcess(this); if (g_queryTable.insertNewQuery(newQ) == -1) { delete newQ; // State is deleted with newQ } else { newQ->init(); g_psList.push_back(newQ->getQueryID()); FD_SET(g_dummySock, &g_writeSet); g_maxFD = MAX(g_dummySock, g_maxFD); } } break;#endif default: break; } } return 0;}int MeridianProcess::getInfoPacket(RealPacket& inPacket) { int pos = inPacket.getPayLoadSize(); char* buf = inPacket.getPayLoad(); int packetSize = inPacket.getPacketSize(); // This is taken from old version of code struct timeval tvStart, tvEnd; gettimeofday(&tvStart, NULL); pos += snprintf(buf + pos, packetSize - pos, "HTTP/1.1 200 OK\r\n"); pos += snprintf(buf + pos, packetSize - pos, "Content-Type: text/html; charset=iso-8859-1\r\n\r\n"); pos += snprintf(buf + pos, packetSize - pos, "<HTML>\n" "<title>Meridian</title>\n<body>\n"); pos += snprintf(buf + pos, packetSize - pos, "<H2>Meridian node: %s</H2>\n", g_hostname); pos += snprintf(buf + pos, packetSize - pos, "<HR SIZE=\"0\">\n"); pos += snprintf(buf + pos, packetSize - pos, "<STRONG>Ring members of %s</STRONG>\n", g_hostname); pos += snprintf(buf + pos, packetSize - pos, "<TABLE cellspacing=\"3\" cellPadding=\"3\" width=\"100%s\" " "border=\"1\">\n<TBODY>\n", "%"); int numRings = g_rings->getNumberOfRings(); for (int i = 0; i < numRings; i++) { const vector<NodeIdent>* primRing = g_rings->returnPrimaryRing(i); if (primRing != NULL && primRing->size() > 0) { pos += snprintf(buf + pos, packetSize - pos, "<TR><TD><STRONG>Nodes in ring %d</STRONG></TD>" "<TD><STRONG>Latency</STRONG></TD></TR>\n", i); for (u_int j = 0; j < primRing->size(); j++) { u_int latencyUS = 0; if (g_rings->getNodeLatency((*primRing)[j], &latencyUS) == -1) { assert(false); } u_int netAddr = htonl((*primRing)[j].addr); char* ringNodeStr = inet_ntoa(*(struct in_addr*)&(netAddr)); pos += snprintf(buf + pos, packetSize - pos, "<TR><TD>%s</TD><TD>%0.3f ms</TD></TR>\n", ringNodeStr, latencyUS / 1000.0); } } } pos += snprintf(buf + pos, packetSize - pos, "</TBODY>\n</TABLE>\n"); gettimeofday(&tvEnd, NULL); pos += snprintf(buf + pos, packetSize - pos, "<BR>Time to create this page is %0.2f ms\n", ((tvEnd.tv_sec - tvStart.tv_sec) * 1000000 + (tvEnd.tv_usec - tvStart.tv_usec)) / 1000.0);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -