⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 chord.c

📁 比较权威的p2p仿真软件
💻 C
📖 第 1 页 / 共 5 页
字号:
/* * Copyright (c) 2003-2005 Jinyang Li *                    Massachusetts Institute of Technology *  * Permission is hereby granted, free of charge, to any person obtaining * a copy of this software and associated documentation files (the * "Software"), to deal in the Software without restriction, including * without limitation the rights to use, copy, modify, merge, publish, * distribute, sublicense, and/or sell copies of the Software, and to * permit persons to whom the Software is furnished to do so, subject to * the following conditions: *  * The above copyright notice and this permission notice shall be * included in all copies or substantial portions of the Software. *  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */#include "observers/chordobserver.h"#include "accordion.h"#include <iostream>#include <stdio.h>#include <assert.h>#include <math.h>using namespace std;extern bool vis;bool static_sim;unsigned int joins = 0;vector<uint> Chord::rtable_sz;#ifdef RECORD_FETCH_LATENCYdouble _allfetchlat = 0.0;double _allfetchsz = 0.0;unsigned int _allfetchnum = 0;#endifChord::Chord(IPAddress i, Args& a, LocTable *l, const char *name) : P2Protocol(i), _isstable (false){  if(a.find("static_sim") != a.end())    static_sim = true;  else    static_sim = false;  //stabilization timer  _stab_basic_timer = a.nget<uint>("basictimer", 10000, 10);  _stab_succlist_timer = a.nget<uint>("succlisttimer",_stab_basic_timer,10);  //location table timeout values  //_timeout = a.nget<uint>("timeout", 5*_stab_succlist_timer, 10);  //successors  _nsucc = a.nget<uint>("successors", 16, 10);  //fragments  _frag = a.nget<uint>("m",1,10);  //how many successors are fragments on?  _allfrag = a.nget<uint>("allfrag",1,10);  assert(_allfrag <= _nsucc);  //recursive routing?  _recurs = a.nget<uint>("recurs",1,10);  _recurs_direct = a.nget<uint>("recurs_direct",1,10);  _stopearly_overshoot = a.nget<uint>("stopearlyovershoot",0,10);  //parallel lookup? parallelism only works in iterative lookup now  _parallel = a.nget<uint>("parallelism",1,10);  _alpha = a.nget<uint>("alpha",1,10);  //lookup using ipkey?  _ipkey = a.nget<uint>("ipkey",0,10);  _asap = a.nget<uint>("asap",_frag,10);  _max_lookup_time = a.nget<uint>("maxlookuptime",4000,10);  _to_multiplier = a.nget<uint>("timeout_multiplier", 3, 10);  _random_id = a.nget<uint>("randid",0,10);  _wkn.ip = 0;  _learn = a.nget<uint>("learn",0,10);  learntable = NULL;  assert(_frag <= _nsucc);  me.ip = ip();  assert(me.ip>0);  if (_random_id)    me.id = ConsistentHash::getRandID();  else {    if (name)       me.id = ConsistentHash::ipname2chid(name);    else      me.id = ConsistentHash::ip2chid(me.ip);  }  if (l)     loctable = l;  else    loctable = New LocTable();  loctable->set_timeout(0);  loctable->set_evict(false);  loctable->init(me);    if (vis) {    printf ("vis %llu node %16qx\n", now (), me.id);  }  _stab_basic_running = false;  _stab_basic_outstanding = 0;  _join_scheduled = 0;  _last_succlist_stabilized = 0;}voidChord::record_stat(IPAddress src, IPAddress dst, uint type, uint num_ids, uint num_else){  Node::record_bw_stat(type,num_ids,num_else);  Node::record_inout_bw_stat(src,dst,num_ids,num_else);}Chord::~Chord(){  if (me.ip == 1) { //same hack as tapestry.C so statistics only gets printed once    Node::print_stats();    printf("<-----STATS----->\n");    sort(rtable_sz.begin(),rtable_sz.end());    uint totalrtable = 0;    uint rsz = rtable_sz.size();    for (uint i = 0; i < rsz; i++)       totalrtable += rtable_sz[i];    printf("RTABLE:: 10p:%u 50p:%u 90p:%u avg:%.2f\n", rtable_sz[(uint)0.1*rsz], rtable_sz[(uint)0.5*rsz],	rtable_sz[(uint)0.9*rsz], (double)totalrtable/(double)rsz);    printf("<-----ENDSTATS----->\n");#ifdef RECORD_FETCH_LATENCY    printf("fetch lat: %.3f %.3f %u\n", _allfetchlat/_allfetchnum, _allfetchsz/_allfetchnum, _allfetchnum); #endif    printf("total joins seen %u\n",joins);  }  delete loctable; }char *Chord::ts(){  static char buf[50];  sprintf(buf, "%llu %s(%u,%qx)", now(), proto_name().c_str(), me.ip, me.id);  return buf;}stringChord::header(){  char buf[128];  sprintf(buf, "%llu %s(%u,%qx,%u) ", now(),proto_name().c_str(),me.ip,me.id,first_ip());  return string(buf);}stringChord::printID(CHID id){  char buf[128];  sprintf(buf,"%qx ",id);  return string(buf);}voidChord::add_edge(int *matrix, int sz){  assert(ip() <= (uint)sz);  vector<IDMap> v = loctable->get_all();  for (uint i = 0; i < v.size(); i++) {    if (v[i].ip != me.ip && Network::Instance()->getnode(v[i].ip)->alive()) {      assert(v[i].ip <= (uint)sz);      matrix[(me.ip-1)*sz + v[i].ip -1] = 1;    }  }}voidChord::check_static_init(){  if ((!static_sim) || (!alive())) return;  if (!_inited) {    _inited = true;    this->initstate();  }}Chord::IDMapChord::next_hop(ConsistentHash::CHID k) {  return loctable->next_hop(k-1);}bool Chord::check_correctness(CHID k, vector<IDMap> v){  vector<IDMap> ids = ChordObserver::Instance(NULL)->get_sorted_nodes();  IDMap tmp;  tmp.id = k;  uint idsz = ids.size();  uint pos = upper_bound(ids.begin(), ids.end(), tmp, Chord::IDMap::cmp) - ids.begin();  uint iter = 0;  while (iter<idsz) {    if (pos >= idsz) pos = 0;    Chord *node = (Chord *)Network::Instance()->getnode(ids[pos].ip);    if (Network::Instance()->alive(ids[pos].ip)	&& node->inited())      break;    pos++;    iter++;  }  for (uint i = 0; i < v.size(); i++) {    if (ids[pos].ip == v[i].ip) {      pos = (pos+1) % idsz;    } else if (ConsistentHash::betweenrightincl(k,ids[pos].id,v[i].id)) {      CDEBUG(2) << "lookup incorrect(?) key" << printID(k) << " succ should be "	<< ids[pos].ip << ","<< printID(ids[pos].id) << "instead of " <<	v[i].ip << "," << printID(v[i].id) << endl;      return false;     } else {      CDEBUG(2) << "lookup incorrect(?) key" << printID(k) << " succ should be "	<< ids[pos].ip << ","<< printID(ids[pos].id) << "instead of " <<	v[i].ip << "," << printID(v[i].id) << endl;      return false;    }  }  return true;}//XXX Currently, it does not handle losses//Recursive lookup canot deal with duplicate packets//also, add_node() does not reset status flag yettemplate<class BT, class AT, class RT>bool Chord::failure_detect(IDMap dst, void (BT::* fn)(AT *, RT *), AT *args, RT *ret,     uint type, uint num_args_id, uint num_args_else, int num_retry){  bool r;  Time retry_to = TIMEOUT(me.ip,dst.ip);  int checks;  //checks = loctable->add_check(dst,0);  checks = 2;  //if (checks == -1) return false;  //int tmp;  while (checks < num_retry) {    record_stat(me.ip, dst.ip, type,num_args_id,num_args_else);    r = doRPC(dst.ip, fn, args, ret, retry_to);    if (!alive())       return false;    if (r) {      return true;    }    checks++;    retry_to = retry_to * 2;  }  return false;}voidChord::learn_info(IDMap n){  //do nothing;}bool Chord::replace_node(IDMap n, IDMap &replacement){  return false;}voidChord::lookup(Args *args) {  check_static_init();  lookup_args *a = New lookup_args;  if (!_ipkey) {    a->key = args->nget<CHID>("key");    a->ipkey = 0;  } else {    a->ipkey = args->nget<IPAddress>("key");    if (!Network::Instance()->alive(a->ipkey)) {      delete a;      return;    }    a->key = dynamic_cast<Chord *>(Network::Instance()->getnode(a->ipkey))->id() + 1;  }  CDEBUG(1) << "start looking up key " << printID(a->key) << "ipkey "     << a->ipkey << endl;  assert(a->key);  a->start = now();  a->latency = 0;  a->num_to = 0;  a->total_to = 0;  a->retrytimes = 0;  a->hops = 0;  lookup_internal(a); //lookup internal deletes a}voidChord::lookup_internal(lookup_args *a){  vector<IDMap> v;  IDMap lasthop;  lasthop.ip = 0;  a->retrytimes++;  if (_recurs) {    v = find_successors_recurs(a->key, _frag, TYPE_USER_LOOKUP, &lasthop,a);  } else {    v = find_successors(a->key, _frag, TYPE_USER_LOOKUP, &lasthop,a);  }  if (!alive()) {    delete a;    return;  }  if (_learn) {    if (lasthop.ip)       learn_info(lasthop);    for (uint i = 0; i < v.size(); i++)       learn_info(v[i]);  }  if (a->latency >= _max_lookup_time) {    record_lookup_stat(me.ip, lasthop.ip, a->latency, false, false, a->hops, a->num_to, a->total_to);  }else if ((!_ipkey  && v.size() > 0) || (_ipkey && lasthop.ip == a->ipkey)) {    record_lookup_stat(me.ip, lasthop.ip, a->latency, true, true, a->hops, a->num_to, a->total_to);#ifdef RECORD_FETCH_LATENCY    assert(static_sim);    assert(v.size() >= _frag);    vector<uint> tmplat;    tmplat.clear();    Topology *t = Network::Instance()->gettopology();    for (uint i = 0; i < v.size(); i++) {      if (tmplat.size() < _allfrag)	tmplat.push_back(2*t->latency(me.ip, v[i].ip));    }    sort(tmplat.begin(),tmplat.end());    _allfetchlat += (double)tmplat[6]; //XXX this is a hack    _allfetchsz += (double)tmplat.size();    _allfetchnum++;#endif    CDEBUG(1) << "lookup correct key " << printID(a->key) << "interval "       << a->latency << endl;  }else{    if (_ipkey && a->retrytimes<=1 && (!Network::Instance()->alive(a->ipkey))) {    }else if (_ipkey && (a->retrytimes >2 || Network::Instance()->alive(a->ipkey))) {      record_lookup_stat(me.ip, lasthop.ip, a->latency, false, false, a->hops, a->num_to, a->total_to);    }else{      CDEBUG(1) << "lookup incorrect key " << printID(a->key)	<< "lastnode " << lasthop.ip << "," << printID(lasthop.id)	<< "latency " << a->latency << " start " << a->start << endl;      a->latency += 100;      delaycb(100, &Chord::lookup_internal, a);      return;    }  }  delete a;}voidChord::find_successors_handler(find_successors_args *args, 			       find_successors_ret *ret){  check_static_init();  if (_recurs)    ret->v = find_successors_recurs(args->key, args->m, TYPE_JOIN_LOOKUP, &(ret->last));  else    ret->v = find_successors(args->key, args->m, TYPE_JOIN_LOOKUP, &(ret->last));  if (ret->v.size() > 0)     CDEBUG(3) <<" find_successors_handler key "<< printID(args->key) <<"succ "       << ret->v[0].ip << "," << printID(ret->v[0].id) << endl;  ret->dst = me;}vector<Chord::IDMap>Chord::find_successors(CHID key, uint m, uint type, IDMap *lasthop, lookup_args *a){  //parallelism controls how many queries are inflight  Time before;  next_args na;  hash_map<IPAddress, bool> asked;  hop_info lastfinished;  list<hop_info> tasks;  list<hop_info> savefinished;  bool ok;  unsigned rpc, donerpc;  unsigned totalrpc = 0;  nextretinfo *reuse = NULL;  nextretinfo *p;  RPCSet rpcset;  RPCSet alertset;  hash_map<unsigned, unsigned> resultmap; //result to rpc number mapping  hash_map<unsigned, alert_args*> alertmap;  hop_info h;  vector<IDMap> results;  vector<IDMap> to_be_replaced;  to_be_replaced.clear();  uint outstanding, parallel, alertoutstanding;  uint rpc_i;  h.from = me;  h.to = me;  h.hop = 1;  na.src = me;  na.type = type;  na.key = key;  na.m = m;  if (type == TYPE_USER_LOOKUP) {    parallel = _parallel;    na.alpha = _alpha;  } else {    na.alpha = 1;    parallel = 1;  }  vector<nextretinfo*> rpcslots;  for(uint i = 0; i < parallel; i++) {    reuse = New nextretinfo;    reuse->free = true;    rpcslots.push_back(reuse);  }  //init my timeout calculations  vector<uint> num_timeouts;  vector<uint> time_timeouts;  vector<uint> num_hops;  for (uint i = 0; i < parallel; i++) {    num_timeouts.push_back(0);    time_timeouts.push_back(0);    num_hops.push_back(0);  }  na.deadnodes.clear();  na.retry = false;  savefinished.clear();  lastfinished.from.ip = 0;  lastfinished.to.ip = me.ip;  lastfinished.to.id = me.id;  lastfinished.hop = 0;  //put myself in the task queue  tasks.push_back(h);  asked[me.ip] = true;  results.clear();  outstanding = alertoutstanding = 0;   /*  vector<hop_info> recorded;  recorded.clear();  */  CDEBUG(2) << "start lookup key " << printID(key) << "type "     << type << endl;  while (1) {    assert(totalrpc < 100 && (na.deadnodes.size()<20));    if ((tasks.size() == 0) && (outstanding == 0))      tasks.push_back(lastfinished);    while ((outstanding < parallel) && (tasks.size() > 0)) {      h = tasks.front();      if (ConsistentHash::betweenrightincl(h.to.id, key, lastfinished.to.id) && (outstanding > 0))	break;      else if ((h.to.ip == lastfinished.to.ip) && (totalrpc > 0)) {	assert(outstanding == 0);	na.retry = true;	tasks.pop_front();      }else if (ConsistentHash::betweenrightincl(lastfinished.to.id, key, h.to.id) || (totalrpc == 0)) {	na.retry = false;	tasks.pop_front();      }else{	CDEBUG(2) << "key " << printID(key) << "timeout resume to node " << 	  lastfinished.to.ip << "," << printID(lastfinished.to.id) << endl;	na.retry = true;	h = lastfinished;      }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -