📄 marshal.h
字号:
uint32_t latencyUS = ntohl(rb.retrieve_uint()); tmpMap[tmpIdent] = latencyUS; } if (rb.error()) { return NULL; } RetResponse* ret = new RetResponse(queryID, closestAddr, closestPort, tmpMap); return ret; } virtual int createRealPacket(RealPacket& inPacket) const { inPacket.append_char(getPacketType()); write_id(inPacket); inPacket.append_uint(htonl(addr)); inPacket.append_ushort(htons(port)); inPacket.append_uint(htonl(targets.size())); for (uint32_t i = 0; i < targets.size(); i++) { NodeIdentLat tmpIdent = targets[i]; inPacket.append_uint(htonl(tmpIdent.addr)); inPacket.append_ushort(htons(tmpIdent.port)); inPacket.append_uint(htonl(tmpIdent.latencyUS)); } //inPacket.append_uint(htonl(addr)); //inPacket.append_ushort(htons(port)); if (!inPacket.completeOkay()) { return -1; } return 0; // Packet completed correctly } virtual char getPacketType() const { return RET_RESPONSE; } virtual ~RetResponse() {} };class GossipPacketGeneric : public RendvHeaderPacket {protected: vector<NodeIdentRendv> targets; public: GossipPacketGeneric(uint64_t id, uint32_t in_rendv_addr, uint16_t in_rendv_port) : RendvHeaderPacket(id, in_rendv_addr, in_rendv_port) {} template <class T> static GossipPacketGeneric* parse(const char* buf, int numBytes) { BufferWrapper rb(buf, numBytes); char queryType = rb.retrieve_char(); if (rb.error() || queryType != T::type()) { return NULL; } uint32_t queryID_1 = ntohl(rb.retrieve_uint()); uint32_t queryID_2 = ntohl(rb.retrieve_uint()); uint64_t queryID = to64(queryID_1, queryID_2); uint32_t magicNumber = ntohl(rb.retrieve_uint()); uint32_t outerRAddr = ntohl(rb.retrieve_uint()); uint16_t outerRPort = ntohs(rb.retrieve_ushort()); if (rb.error() || magicNumber != MAGIC_NUMBER) { return NULL; } GossipPacketGeneric* ret = new T(queryID, outerRAddr, outerRPort); uint32_t numEntry = ntohl(rb.retrieve_uint()); for (uint32_t i = 0; (!rb.error() && i < numEntry); i++) { uint32_t addr = ntohl(rb.retrieve_uint()); uint16_t port = ntohs(rb.retrieve_ushort()); uint32_t rAddr = ntohl(rb.retrieve_uint()); uint16_t rPort = ntohs(rb.retrieve_ushort()); ret->addNode(addr, port, rAddr, rPort); } if (rb.error()) { delete ret; return NULL; } return ret; } const vector<NodeIdentRendv>* returnTargets() { return &targets; } virtual int createRealPacket(RealPacket& inPacket) const { uint32_t num_targets = targets.size(); // Must have at least one packet inPacket.append_char(getPacketType()); write_id(inPacket); write_rendv(inPacket); inPacket.append_uint(htonl(num_targets)); for (uint32_t i = 0; i < num_targets; i++) { NodeIdentRendv tmp = targets[i]; inPacket.append_uint(htonl(tmp.addr)); inPacket.append_ushort(htons(tmp.port)); inPacket.append_uint(htonl(tmp.addrRendv)); inPacket.append_ushort(htons(tmp.portRendv)); } if (!inPacket.completeOkay()) { return -1; } return 0; // Packet completed correctly } virtual char getPacketType() const = 0; void addNode(uint32_t addr, uint16_t port, uint32_t rendv_addr, uint16_t rendv_port) { NodeIdentRendv tmp = {addr, port, rendv_addr, rendv_port}; targets.push_back(tmp); } virtual ~GossipPacketGeneric() {} };class GossipPacketPush : public GossipPacketGeneric {public: GossipPacketPush(uint64_t id, uint32_t in_rendv_addr, uint16_t in_rendv_port) : GossipPacketGeneric(id, in_rendv_addr, in_rendv_port) {} virtual ~GossipPacketPush() {} template <class T> static GossipPacketGeneric* parse(const char* buf, int numBytes) { assert(false); return NULL; } static char type() { return GOSSIP; } virtual char getPacketType() const { return type(); } };class GossipPacketPull : public GossipPacketGeneric {public: GossipPacketPull(uint64_t id, uint32_t in_rendv_addr, uint16_t in_rendv_port) : GossipPacketGeneric(id, in_rendv_addr, in_rendv_port) {} virtual ~GossipPacketPull() {} template <class T> static GossipPacketGeneric* parse(const char* buf, int numBytes) { assert(false); return NULL; } static char type() { return GOSSIP_PULL; } virtual char getPacketType() const { return type(); } };class PullPacket : public Packet {protected: uint32_t srcIP; uint16_t srcPort; uint32_t payLoadSize;public: PullPacket(uint64_t id, uint32_t in_src_ip, uint16_t in_src_port, uint32_t in_payLoadSize) : Packet(id), srcIP(in_src_ip), srcPort(in_src_port), payLoadSize(in_payLoadSize) {} virtual int createRealPacket(RealPacket& inPacket) const { inPacket.append_char(getPacketType()); write_id(inPacket); inPacket.append_uint(htonl(srcIP)); inPacket.append_ushort(htons(srcPort)); inPacket.append_uint(htonl(payLoadSize)); if (!inPacket.completeOkay()) { return -1; } return 0; } static RealPacket* parse(RealPacket& inPacket, NodeIdent& srcNode) { BufferWrapper rb(inPacket.getPayLoad(), inPacket.getPayLoadSize()); char queryType = rb.retrieve_char(); if (rb.error() || queryType != PULL) { return NULL; } rb.retrieve_uint(); // Skipping queryid_1 rb.retrieve_uint(); // Skipping queryid_2 uint32_t magicNumber = ntohl(rb.retrieve_uint()); uint32_t in_srcIP = ntohl(rb.retrieve_uint()); uint16_t in_srcPort = ntohs(rb.retrieve_ushort()); uint32_t in_payLoadSize = ntohl(rb.retrieve_uint()); if (rb.error() || magicNumber != MAGIC_NUMBER) { return NULL; } if (rb.remainBufSize() < in_payLoadSize) { return NULL; } // Destination doesn't matter, since it is not actually being sent NodeIdent dummy = {0, 0}; RealPacket* newPacket = new RealPacket(dummy, in_payLoadSize); if (newPacket != NULL) { srcNode.addr = in_srcIP; srcNode.port = in_srcPort; memcpy(newPacket->getPayLoad(), inPacket.getPayLoad() + rb.returnPos(), in_payLoadSize); newPacket->setPayLoadSize(in_payLoadSize); } // Regardless of whether the "new RealPacket" succeeded or not, // return newPacket if (rb.remainBufSize() > in_payLoadSize) { //printf("Must perform memmove\n"); // Move end parts of the buffer into beginning part // Must use memmove, the buffer might overlap memmove(inPacket.getPayLoad(), inPacket.getPayLoad() + rb.returnPos() + in_payLoadSize, inPacket.getPayLoadSize() - (rb.returnPos() + in_payLoadSize)); inPacket.setPayLoadSize( inPacket.getPayLoadSize() - (rb.returnPos() + in_payLoadSize)); } else { inPacket.setPayLoadSize(0); } return newPacket; } virtual char getPacketType() const { return PULL; } virtual ~PullPacket() {} };class PushPacket : public Packet {protected: uint32_t destIP; uint16_t destPort;public: PushPacket(uint64_t id, uint32_t in_dest_ip, uint16_t in_dest_port) : Packet(id), destIP(in_dest_ip), destPort(in_dest_port) {} virtual int createRealPacket(RealPacket& inPacket) const { inPacket.append_char(getPacketType()); write_id(inPacket); inPacket.append_uint(htonl(destIP)); inPacket.append_ushort(htons(destPort)); if (!inPacket.completeOkay()) { return -1; } return 0; } static RealPacket* parse( const NodeIdent& remoteNode, const char* buf, int numBytes) { BufferWrapper rb(buf, numBytes); char queryType = rb.retrieve_char(); if (rb.error() || queryType != PUSH) { return NULL; } rb.retrieve_uint(); // Skipping queryid_1 rb.retrieve_uint(); // Skipping queryid_2 uint32_t magicNumber = ntohl(rb.retrieve_uint()); uint32_t in_destIP = ntohl(rb.retrieve_uint()); uint16_t in_destPort = ntohs(rb.retrieve_ushort()); if (rb.error() || magicNumber != MAGIC_NUMBER) { return NULL; } // Create a ret packet NodeIdent destIdent = {in_destIP, in_destPort}; RealPacket* retPacket = new RealPacket(destIdent); PullPacket tmpPull(0, remoteNode.addr, remoteNode.port, rb.remainBufSize()); if (tmpPull.createRealPacket(*retPacket) == -1) { delete retPacket; return NULL; } // Append the original packet retPacket->append_str(buf + rb.returnPos(), rb.remainBufSize()); if (!(retPacket->completeOkay())) { delete retPacket; return NULL; } return retPacket; } virtual char getPacketType() const { return PUSH; } virtual ~PushPacket() {}};class RetPing : public Packet {protected: vector<NodeIdentLat> nodes;public: RetPing(uint64_t id) : Packet(id) {} static RetPing* parse(const char* buf, int numBytes) { BufferWrapper rb(buf, numBytes); char queryType = rb.retrieve_char(); if (rb.error() || queryType != RET_PING_REQ) { return NULL; } uint32_t queryID_1 = ntohl(rb.retrieve_uint()); uint32_t queryID_2 = ntohl(rb.retrieve_uint()); uint64_t queryID = to64(queryID_1, queryID_2); uint32_t magicNumber = ntohl(rb.retrieve_uint()); if (rb.error() || magicNumber != MAGIC_NUMBER) { return NULL; } RetPing* ret = new RetPing(queryID); uint32_t numEntry = ntohl(rb.retrieve_uint()); NodeIdent tmpIdent; //while (!rb.error() && numEntry-- > 0) { for (uint32_t i = 0; (!rb.error() && i < numEntry); i++) { tmpIdent.addr = ntohl(rb.retrieve_uint()); tmpIdent.port = ntohs(rb.retrieve_ushort()); uint32_t latencyUS = ntohl(rb.retrieve_uint()); ret->addNode(tmpIdent, latencyUS); } if (rb.error()) { delete ret; return NULL; } return ret; } const vector<NodeIdentLat>* returnNodes() { return &nodes; } virtual int createRealPacket(RealPacket& inPacket) const { uint32_t num_nodes = nodes.size(); inPacket.append_char(getPacketType()); write_id(inPacket); inPacket.append_uint(htonl(num_nodes)); for (uint32_t i = 0; i < num_nodes; i++) { NodeIdentLat tmp = nodes[i]; inPacket.append_uint(htonl(tmp.addr)); inPacket.append_ushort(htons(tmp.port)); inPacket.append_uint(htonl(tmp.latencyUS)); } if (!inPacket.completeOkay()) { return -1; } return 0; } void addNode(const NodeIdent& in_node, uint32_t in_latency_us) { NodeIdentLat tmp = {in_node.addr, in_node.port, in_latency_us}; nodes.push_back(tmp); } virtual char getPacketType() const { return RET_PING_REQ; } virtual ~RetPing() {} };class PingPacket : public Packet {public: PingPacket(uint64_t id) : Packet(id) {} virtual int createRealPacket(RealPacket& inPacket) const { inPacket.append_char(getPacketType()); write_id(inPacket); if (!inPacket.completeOkay()) { return -1; } return 0; } virtual char getPacketType() const { return PING; } virtual ~PingPacket() {} };class PongPacket : public Packet {public: PongPacket(uint64_t id) : Packet(id) {} virtual int createRealPacket(RealPacket& inPacket) const { inPacket.append_char(getPacketType()); write_id(inPacket); if (!inPacket.completeOkay()) { return -1; } return 0; } virtual char getPacketType() const { return PONG; } virtual ~PongPacket() {} };class CreateRendv : public Packet {public: CreateRendv(uint64_t id) : Packet(id) {} virtual int createRealPacket(RealPacket& inPacket) const { inPacket.append_char(getPacketType()); write_id(inPacket); if (!inPacket.completeOkay()) { return -1; } return 0; } virtual char getPacketType() const { return CREATE_RENDV; } virtual ~CreateRendv() {} };class RetRendv : public Packet {public: RetRendv(uint64_t id) : Packet(id) {} virtual int createRealPacket(RealPacket& inPacket) const { inPacket.append_char(getPacketType()); write_id(inPacket); if (!inPacket.completeOkay()) { return -1; } return 0; } virtual char getPacketType() const { return RET_RENDV; } virtual ~RetRendv() {} }; class InfoPacket : public Packet {private: const RingSet* rings;public: InfoPacket(uint64_t id, const RingSet* in_rings) : Packet(id), rings(in_rings) {} virtual int createRealPacket(RealPacket& inPacket) const; static int parse(const char* buf, int numBytes, map<u_int, vector<NodeIdentLat>*>& inMap); virtual char getPacketType() const { return INFO_PACKET; } virtual ~InfoPacket() {} };#ifdef MERIDIAN_DSL#include "MQLState.h"#include "MeridianDSL.h"class DSLReplyPacket : public Packet {public: DSLReplyPacket(uint64_t id) : Packet(id) {} static DSLReplyPacket* parse(ParserState* ps, const char* buf, int numBytes, ASTNode** ret_node) { BufferWrapper rb(buf, numBytes); char queryType = rb.retrieve_char(); if (rb.error() || queryType != DSL_REPLY) { ERROR_LOG("Wrong type received\n"); return NULL; } uint32_t queryID_1 = ntohl(rb.retrieve_uint()); uint32_t queryID_2 = ntohl(rb.retrieve_uint()); uint64_t queryID = to64(queryID_1, queryID_2); uint32_t magicNumber = ntohl(rb.retrieve_uint()); if (rb.error() || magicNumber != MAGIC_NUMBER) { ERROR_LOG("Wrong magic number in packet received\n"); return NULL; } DSLReplyPacket* ret = new DSLReplyPacket(queryID); *ret_node = unmarshal_ast(ps, &rb); return ret; } // Note: Need to append actual payload using the marshal_packet call // This just creates the necessary Meridian headers virtual int createRealPacket(RealPacket& inPacket) const { inPacket.append_char(getPacketType()); write_id(inPacket); if (!inPacket.completeOkay()) { return -1; } return 0; } virtual char getPacketType() const { return DSL_REPLY; } virtual ~DSLReplyPacket() {} };class DSLRequestPacket : public RendvHeaderPacket {private: uint16_t ms_remain; uint16_t ttl;public: DSLRequestPacket(uint64_t id, uint16_t in_ms_remain, uint16_t in_ttl, uint32_t in_rendv_addr, uint16_t in_rendv_port) : RendvHeaderPacket(id, in_rendv_addr, in_rendv_port), ms_remain(in_ms_remain), ttl(in_ttl) {} static DSLRequestPacket* parse( ParserState* ps, const char* buf, int numBytes) { BufferWrapper rb(buf, numBytes); char queryType = rb.retrieve_char(); if (rb.error() || queryType != DSL_REQUEST) { ERROR_LOG("Wrong type received\n"); return NULL; } uint32_t queryID_1 = ntohl(rb.retrieve_uint()); uint32_t queryID_2 = ntohl(rb.retrieve_uint()); uint64_t queryID = to64(queryID_1, queryID_2); uint32_t magicNumber = ntohl(rb.retrieve_uint()); uint32_t rendvAddr = ntohl(rb.retrieve_uint()); uint16_t rendvPort = ntohs(rb.retrieve_ushort()); uint16_t cur_remain = ntohs(rb.retrieve_ushort()); uint16_t cur_ttl = ntohs(rb.retrieve_ushort()); if (rb.error() || magicNumber != MAGIC_NUMBER) { ERROR_LOG("Wrong magic number in packet received\n"); return NULL; } DSLRequestPacket* ret = new DSLRequestPacket( queryID, cur_remain, cur_ttl, rendvAddr, rendvPort); if (unmarshal_packet(*ps, rb) == -1 || rb.error()) { delete ret; return NULL; } return ret; } // Note: Need to append actual payload using the marshal_packet call // This just creates the necessary Meridian headers virtual int createRealPacket(RealPacket& inPacket) const { inPacket.append_char(getPacketType()); write_id(inPacket); write_rendv(inPacket); inPacket.append_ushort(htons(ms_remain)); inPacket.append_ushort(htons(ttl)); if (!inPacket.completeOkay()) { return -1; } return 0; } virtual char getPacketType() const { return DSL_REQUEST; } uint16_t timeout_ms() const { return ms_remain; } uint16_t getTTL() const { return ttl; } virtual ~DSLRequestPacket() {} };#endif#endif
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -