⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 kelips.h

📁 比较权威的p2p仿真软件
💻 H
字号:
#ifndef __KELIPS_H#define __KELIPS_H// To Do:// OOPS short gossip intervals don't work right since gossip() blocks// there is something wrong with proximity routing, not fast enuf// log timeouts' contribution to rtt// need to parameterize number of nodes to ping per gossip round// Differences from Kelips as described in IPTPS 2003 paper:// I look up IDs, not files.// When/how do you learn RTT to a new node? So you know whether to//   keep it in your contacts list.// How often does a node generate a new heartbeat value?// When does a node send Info for itself? random? every gossip?// For how many rounds do they gossip a new item? (1 seems to work best)// Do existing items with new heartbeats count as new for gossip?// How big are the rations mentions in 2.1? In the eval.//   For group vs contact, and also new vs old.// Pull at join and then all push often leaves isolated nodes,//   since maybe nobody pushes to me. So what happens at join?// ***// What heartbeat timeout values?// The contacts tend to time out a lot...// Replacing contacts with newer heartbeat helps convergence a lot.//   As well as reducing contact timeout dramatically.// Are lookups iterative or recursive? Who controls various retries?// To do:// why does one-hop (k=1) w/ n=1024 suck so much?//   should be fabulous if you gossip enough!// gossip to nearby nodes preferentially?// hmm, we get a LOT of lookup failures in kx.pl runs, like 1/2 of lookups//   any way to make it more persistent?// why doesn't 1024 get down to 1 RTT?//   not moving info fast enough?// examine effect of each paramter individually// maybe try parameters that move more info?// is there an explicit "dead" flag on gossip entries?// Does it stabilize after the expected number of rounds?// Gossip w/o favoring new nodes (nnodes: avg median)://   100: 10 10//   400: 27 27// With gossiping new nodes first for 1 round://   100: 10 11//   400: 25 29// Replacing contacts with newer heartbeat, _item_rounds = 1://   100:  6  5//   400: 13 14// Oy, _item_rounds=0 beats this by a lot.#include "p2psim/p2protocol.h"#include "consistenthash.h"#include "p2psim/p2psim.h"#include "p2psim/condvar.h"#include "p2psim/network.h"#include <map>#include <vector>#include <set>#ifdef HAVE_LIBGBextern "C" {  #include "gb_graph.h"   #include <gb_dijk.h>}#endifclass Kelips : public P2Protocol {public:  typedef u_int ID;  struct lookup_args{    ID key;    Time start;    u_int retrytimes;    vector<IPAddress> history;    Time total_to;    u_int num_to;  };  struct lookup1_args{    ID key;    IPAddress dst_ip;  };  Kelips(IPAddress i, Args a);  virtual ~Kelips();  string proto_name() { return "Kelips"; }  // Functions callable from events file.  virtual void join(Args*);  virtual void leave(Args*);  virtual void crash(Args*);  virtual void lookup(Args*);  virtual void insert(Args*);  virtual void nodeevent (Args *) {};  void lookup_internal(lookup_args *a);  void add_edge(int *matrix, int sz); private:  // typedef ConsistentHash::CHID ID;  // Parameters, with defaults from Kelips paper.  int _round_interval;    // (2000) inter-gossip interval.  u_int _group_targets;   // (3) gossip to X targets in group.  u_int _contact_targets; // (3) gossip to X contacts.  u_int _group_ration;    // (4) group items per gossip packet.  u_int _contact_ration;  // (2) contact items per gossip packet.  int _n_contacts;        // (2) contacts to remember per foreign group.  int _item_rounds;       // (1???) how many times to gossip a new item.  Time _max_lookup_time;  u_int _purge_time;      // period the purge timer is run  u_int _track_conncomp_timer;   u_int _to_multiplier; //a retransmit timer  u_int _to_cheat;    int _timeout;           // (25000???) milliseconds to group item timeout.  // hearbeat timeouts. XXX not specified in paper.  /*  static const int _group_timeout = 25000;  static const int _contact_timeout = 50000;  */  int _k; // number of affinity groups, should be sqrt(n)  bool _started; // are our timers ticking?  bool _live;    // are we joined but not crashed?  // global statistics  static double _rpc_bytes; // total traffic  static double _good_latency; // successful lookups  static double _good_hops;  static int _good_lookups;  static int _ok_failures;  static int _bad_failures;  // Information about one other node.  class Info {  public:    IPAddress _ip;    Time _heartbeat; // when _ip last spoke to anyone.    int _rounds;     // how many rounds to send for.    int _rtt;        // measured by us, -1 if not valid    Info(IPAddress ip, Time hb) {       _ip = ip; _heartbeat = hb; _rounds = 0; _rtt = -1;    }    Info() { _ip = 0; _heartbeat = 0; _rounds = 0; _rtt = -1; }    int age() const { return now() - _heartbeat; }  };  // Set of nodes that this node knows about.  // This is the paper's Affinity Group View and Contacts.  map<IPAddress, Info *> _info;  void gotinfo(Info i, int rtt);  ID ip2id(IPAddress xip) {    return xip;    // return ConsistentHash::ip2chid(xip);  }  int id2group(ID id) { return(id % _k); }  int ip2group(IPAddress xip) { return(id2group(ip2id(xip))); }  ID id() { return ip2id(ip()); }  int group() { return ip2group(ip()); }  void gossip(void *);  void handle_gossip(vector<Info> *, void *);  void purge(void *);  vector<IPAddress> all();  vector<IPAddress> grouplist(int g);  vector<IPAddress> notgrouplist(int g);  void check(bool doprint);  void handle_join(IPAddress *caller, vector<Info> *ret);  vector<Info> gossip_msg(int g, u_int, u_int);  vector<IPAddress> randomize(vector<IPAddress> a);  vector<IPAddress> newold(vector<IPAddress> a, bool xnew);  void newold_msg(vector<Info> &msg, vector<IPAddress> l, u_int ration);  void handle_lookup_final(ID *kp, bool *done);  void handle_lookup1(lookup1_args *kp, IPAddress *res);  bool lookup1(lookup_args *a);  bool lookupvia(lookup_args *a, IPAddress via);  bool lookup2(lookup_args *a);  bool lookup_loop(lookup_args *a);  void handle_lookup2(ID *kp, IPAddress *res);  IPAddress find_by_id(ID key);  void init_state(const set<Node*>*);  bool stabilized(vector<ID> lid);  void rpcstat(bool ok, IPAddress dst, int latency, int nitems, uint type);  IPAddress victim(int g);  void handle_ping(void *, void *);  inline int contact_score(const Info &i);  IPAddress closest_contact(int g);  bool node_key_alive(ID key);  // RPC that records RTT.  template<class AT, class RT>    bool xRPC(IPAddress dst, int nitems, void (Kelips::* fn)(AT *, RT *),              AT *args, RT *ret, uint type=1, Time *total_to=NULL, uint *num_to=NULL){    Time t1 = now();    Time timeout = 1000;    if (_to_cheat) {      timeout = _to_multiplier * 2 * (Network::Instance()->gettopology())->latency(ip(),dst);    }else if ((_info.find(dst) == _info.end())       || (_info[dst]->_rtt == 9999) || 	(_info[dst]->_rtt == -1)) {      timeout = 1000; //1 second timeout    } else {      timeout = _to_multiplier * _info[dst]->_rtt;    }    bool ok = doRPC(dst, fn, args, ret, timeout);    rpcstat(ok, dst, now() - t1, nitems,type);    if (!ok) {      if (total_to) *total_to += timeout; //record the timeout incurred      if (num_to) *num_to += 1;    }    return ok;  };  friend class KelipsObserver;};#endif

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -