⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 onehop.c

📁 P2P模拟器
💻 C
📖 第 1 页 / 共 4 页
字号:
/* * Copyright (c) 2003-2005 [Anjali Gupta] *                    Massachusetts Institute of Technology *  * Permission is hereby granted, free of charge, to any person obtaining * a copy of this software and associated documentation files (the * "Software"), to deal in the Software without restriction, including * without limitation the rights to use, copy, modify, merge, publish, * distribute, sublicense, and/or sell copies of the Software, and to * permit persons to whom the Software is furnished to do so, subject to * the following conditions: *  * The above copyright notice and this permission notice shall be * included in all copies or substantial portions of the Software. *  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. *//* jy: (main additions) * all doRPC and xRPC have been changed to fd_xRPC that does failure detection  * (for a lossy/congested network) * for every lookup, check its correctness against global state * to fix(?) currently failure_detection is done with the failure of one packet */#include "onehop.h"#include "consistenthash.h"#include "observers/onehopobserver.h"#include <iostream>#define TESTPRINT(me,he,tag) if (me.ip==1159 && he.ip==2047) printf("WUWU now %llu tag %s\n", now(),tag);#define DEBUG_MSG(n,msg,s) if (p2psim_verbose && n.ip == OneHop::debug_node) printf("%llu %u,%qx %s knows debug from sender %u,%qx\n",now(), me.ip, me.id, msg, s.ip, s.id);#define MAX_IDS_MSG 295#define MAX_LOOKUP_TIME 4000//XXX:Strange bug - delaycb gives join a 0ed argumentlong OneHop::lookups = 0;long OneHop::failed = 0;long OneHop::two_failed = 0;long OneHop::old_lookups = 0;long OneHop::old_failed = 0;long OneHop::old_two_failed = 0;long OneHop::num_nodes = 0;long OneHop::joins = 0;long OneHop::crashes = 0;long OneHop::same_lookups = 0;long OneHop::same_failed = 0;Time OneHop::act_interslice = 0;Time OneHop::tot_interslice = 0;long OneHop::nonempty_outers = 0;long OneHop::nonempty_leaders = 0;Time OneHop::act_intraslice = 0;Time OneHop::tot_intraslice = 0;Time OneHop::total_empty = 0;Time OneHop::total_count = 0;Time OneHop::exp_intraslice = 0;bw OneHop::bandwidth = 0;bw OneHop::leader_bandwidth = 0;bw OneHop::messages = 0;bw OneHop::leader_messages = 0;bw OneHop::lookup_bandwidth = 0;bw OneHop::old_bandwidth = 0;bw OneHop::old_lookup_bandwidth = 0;bw OneHop::old_leader_bandwidth = 0;unsigned OneHop::start = 0;Time OneHop::old_time = 0;int OneHop::_publish_time = -1; //jy: control how frequently one publishes statisticsunsigned OneHop::num_violations = 0; //jy: anjali sends too many ids in one message, calculate how many violations there arevector<double> OneHop::sliceleader_bw_avg;unsigned OneHop::debug_node = 1;OneHop::OneHop(IPAddress i , Args& a) : P2Protocol(i){  _k = a.nget<uint>("slices",5,10);  //number of units -- must be an odd number  _u = a.nget<uint>("units",5,10);  loctable = New OneHopLocTable(_k, _u);  me.id = id();  me.ip = ip();  loctable->add_node(me);  retries = 0;   _stab_timer = a.nget<uint>("stab",1000,10);  leader_log.clear();  outer_log.clear();  _join_complete = false;  _retry_timer = 1000;  prev_slice_leader = false;  _to_multiplier = 3; //jy  doRPC suffer from timeout if the dst is dead, the min of this value = 3 // slice_size = 0xffffffffffffffff / _k;     slice_size = ((ConsistentHash::CHID)-1)/ _k;     //lookups = 0;  //failed = 0;  _leaderstab_running = false;  low_to_high.clear();  high_to_low.clear();  sent_low = false;  sent_high = false;  if (me.ip == 1)    OneHop::sliceleader_bw_avg.clear();  last_stabilize = 0;  _wkn.ip = 0;}voidOneHop::record_stat(IPAddress src, IPAddress dst, uint type, uint num_ids, uint num_else){  assert(type <= 5);  // printf("now %llu warning: %u sending too much %u\n", now(), me.ip, num_ids);  //assert(num_ids < 300);  if (num_ids > 300) {    OneHop::num_violations += (num_ids/300);    Node::record_bw_stat(type,num_ids,40 * (num_ids/300) + num_else);    Node::record_inout_bw_stat(src,dst,num_ids,40 * (num_ids/300) + num_else);  }else{    Node::record_bw_stat(type,num_ids,num_else);    Node::record_inout_bw_stat(src,dst,num_ids,num_else);  }}//jy: check correctness of lookupsboolOneHop::check_correctness(CHID k, IDMap n){  vector<IDMap> ids = OneHopObserver::Instance(NULL)->get_sorted_nodes();  IDMap tmp;  tmp.id = k;  uint idsz = ids.size();  uint pos = upper_bound(ids.begin(), ids.end(), tmp, OneHop::IDMap::cmp) - ids.begin();  if (now() == 4227297 && n.ip == 504)    fprintf(stderr,"shit!\n");  while (1) {    if (pos >= idsz) pos = 0;    IDMap hehe = ids[pos];    if (Network::Instance()->alive(ids[pos].ip))      break;    pos++;  }  DEBUG(4) << now() << ":" << ip() << "," << printID(id())  << ":key " << printID(k) << " correct? "     << (ids[pos].ip==n.ip?1:0) << " reply " << n.ip << "," << printID(n.id)    << " real " << ids[pos].ip << "," << printID(ids[pos].id) << endl;  if (ids[pos].ip == n.ip)     return true;  else     return false;}voidOneHop::lookup(Args *args){  if (!alive()) return;  lookup_internal_args *la = New lookup_internal_args;  la->k = args->nget<CHID>("key");  la->start_time = now();  la->hops = 0;  la->timeouts = 0;  la->timeout_lat = 0;  la->attempts = 0;  lookup_internal(la);}voidOneHop::lookup_internal(lookup_internal_args *la) {  la->attempts++;  if (!alive() || now()-la->start_time > MAX_LOOKUP_TIME) {    record_lookup_stat(me.ip, me.ip, now()-la->start_time, false, false,	la->hops,la->timeouts,la->timeout_lat);    delete la;    return;  }  if (!_join_complete) {    DEBUG(1) << now() << ":" << me.ip << ","<< printID(me.id)       << ": failed at time "<<now()<< "coz join is incomplete last_join time " << last_join << endl;    delaycb(1000, &OneHop::lookup_internal, la);    /*    record_lookup_stat(me.ip, me.ip, now()-la->start_time, false, false,	la->hops,la->timeouts,la->timeout_lat);    delete la;    */    return;  }  //DEBUG(1) << ip() << ":(slice " << slice(id()) << "): Lookup for " << k << "(slice " << slice(k) << ")" << endl;  lookup_args a;  lookup_ret r;  a.key = la->k;  a.sender = me;  a.dead_nodes.clear();  IDMap succ_node = me;  while ((now()-la->start_time) < MAX_LOOKUP_TIME) {    succ_node = loctable->succ(la->k);    assert(succ_node.ip);    record_stat(me.ip,succ_node.ip,TYPE_USER_LOOKUP,2);    bool ok = doRPC(succ_node.ip,&OneHop::lookup_handler,&a,&r,	TIMEOUT(me.ip,succ_node.ip));    if (ok) record_stat(succ_node.ip,me.ip,TYPE_USER_LOOKUP,r.is_owner?0:1,1);    if (me.ip!=succ_node.ip)      la->hops++;    if (!alive()) break;    DEBUG(1) << now() << ":" << me.ip << "," << printID(me.id) 	     << " lookup done from " << succ_node.ip << "/" 	     << printID(succ_node.id) << " for key " << printID(a.key) 	     << ", ok: " << ok << ", is_owner: " << r.is_owner;    if( !r.is_owner ) {      DEBUG(1) << ", correct_owner=" << printID(r.correct_owner.id);    }    DEBUG(1) << endl;    if (ok) {      if (r.is_owner) {	break;      }else{	DEBUG_MSG(r.correct_owner, "lookup", succ_node);	loctable->add_node(r.correct_owner);      }    }else{      la->timeouts++;      la->timeout_lat += TIMEOUT(me.ip, succ_node.ip);      loctable->del_node(succ_node.id);      test_inform_dead_args *aa = New test_inform_dead_args;      aa->suspect= succ_node;      aa->informed = me;      aa->justdelete = true;      delaycb(0,&OneHop::test_dead_inform,aa);      //return;      // please don't tell us about this guy anymore, he sucks      a.dead_nodes.push_back(succ_node.id);    }  }    if (!alive()) {    delete la;  } else if ((now()-la->start_time) > MAX_LOOKUP_TIME) {    record_lookup_stat(me.ip, me.ip, now()-la->start_time, false, false,		       la->hops,la->timeouts,la->timeout_lat);    delete la;  } else if (check_correctness(la->k,succ_node)) {    record_lookup_stat(me.ip, succ_node.ip, now()-la->start_time, true, true,		       la->hops,la->timeouts,la->timeout_lat);    delete la;  }else{    record_lookup_stat(me.ip, succ_node.ip, now()-la->start_time, true, false,	la->hops,la->timeouts,la->timeout_lat);    delete la;    //delaycb(100,&OneHop::lookup_internal, la);  }}/* anjali's old lookup procedure   it only tries two hops and declare failure   why not try more hops?? voidOneHop::lookup(Args *args) {  if (!alive()) return;  if (!_join_complete) {    //jy: this is a failure    record_lookup_stat(me.ip, me.ip, 0, false, false, 1, 0, 0);    return;  }  Time before0 = now(); //jy: record the start of lookup  lookups++;  CHID k = args->nget<CHID>("key");  bool same = (slice(k) == slice(me.id));  if (same) same_lookups++;  //DEBUG(1) << ip() << ":(slice " << slice(id()) << "): Lookup for " << k << "(slice " << slice(k) << ")" << endl;  assert(k);  lookup_args *a = New lookup_args;  lookup_ret r;  a->key = k;  a->sender = me;  IDMap succ_node = loctable->succ(k);  assert(succ_node.ip);  record_stat(TYPE_USER_LOOKUP,2);  bool ok = doRPC(succ_node.ip,&OneHop::lookup_handler,a,&r);  if (ok) record_stat(TYPE_USER_LOOKUP,1);  Time before1 = now(); //jy: record the end of first lookup hop  if (!alive()) goto LOOKUP_DONE;  lookup_bandwidth += 24;  if (!ok) {    failed++;    two_failed++;    if (same) same_failed++;    DEBUG(5) << ip() << ":Lookup failed due to dead node "<< succ_node.ip << endl;    IDMap *n = New IDMap();    *n = succ_node;    assert(n->ip>0);    delaycb(0, &OneHop::test_dead, n);    IDMap succ_node = loctable->succ(succ_node.id+1);    record_stat(TYPE_USER_LOOKUP,2);    bool ok = doRPC(succ_node.ip, &OneHop::lookup_handler, a, &r);     if (!alive()) goto LOOKUP_DONE;    if (ok) record_stat(TYPE_USER_LOOKUP,1);        lookup_bandwidth += 24;    if (!ok) {      IDMap *nn = New IDMap;      *nn = succ_node;      assert(nn->ip>0);      delaycb(0,&OneHop::test_dead,nn);    }    else if (r.is_owner) {      if (check_correctness(k,succ_node))	record_lookup_stat(me.ip, succ_node.ip, now()-before0, true, true, 2, 1, now()-before1); //jy: record lookup stat      else	record_lookup_stat(me.ip, succ_node.ip, now()-before0, false, false, 2, 1, now()-before1); //jy: record lookup stat      two_failed--;    }  }  else {    if (!r.is_owner) {      failed++;      two_failed++;      lookup_bandwidth += 20;      if (same) same_failed++;      DEBUG(5) << ip() << ":Lookup failed:"<< succ_node.ip << "(" << succ_node.id << ") thinks " << r.correct_owner.ip << "(" << r.correct_owner.id << ")"  << endl;      IDMap corr_owner = r.correct_owner;      record_stat(TYPE_USER_LOOKUP,1);      bool ok = doRPC(corr_owner.ip, &OneHop::lookup_handler, a, &r);       if (ok) record_stat(TYPE_USER_LOOKUP, 1);      if (!alive()) goto LOOKUP_DONE;      lookup_bandwidth += 48;      if (ok && r.is_owner) {        two_failed--;        loctable->add_node(corr_owner);        LogEntry *e = New LogEntry(corr_owner, ALIVE, now());        leader_log.push_back(*e);        delete e;	if (check_correctness(k,corr_owner))	  record_lookup_stat(me.ip, corr_owner.ip, now()-before0, true, true, 2, 0, 0); //jy: record lookup stat	else	  record_lookup_stat(me.ip, corr_owner.ip, now()-before0, false, false, 2, 0, 0); //jy: record lookup stat      }else if (!ok) {	record_lookup_stat(me.ip, corr_owner.ip, now()-before0, false, false, 2, 1, now()-before1); //jy: record lookup stat      }else { //ok && !r.is_owner	record_lookup_stat(me.ip, corr_owner.ip, now()-before0, false, false, 2, 0, 0); //jy: record lookup stat      }    }else{      if (check_correctness(k,succ_node))	record_lookup_stat(me.ip, succ_node.ip, now()-before0, true, true, 1, 0, 0); //jy: record lookup stat      else	record_lookup_stat(me.ip, succ_node.ip, now()-before0, false, false, 1, 0, 0); //jy: record lookup stat    }  }LOOKUP_DONE:  delete a;}*/void OneHop::lookup_handler(lookup_args *a, lookup_ret *r) {  for( int i = 0; i < a->dead_nodes.size(); i++ ) {    loctable->del_node(a->dead_nodes[i]);  }  IDMap succ_node = loctable->succ(a->sender.id);  if (succ_node.id != a->sender.id) {      if (!alive()) return;      DEBUG(5) << now() << ":" << ip() << "," << printID(id()) 	<< ":Found new node " << a->sender.ip << " via lookup\n";      DEBUG_MSG(a->sender,"lookup_handler",a->sender);      loctable->add_node(a->sender);      LogEntry *e = New LogEntry(a->sender, ALIVE, now());      leader_log.push_back(*e);      delete e;  }  CHID key = a->key;  IDMap corr_succ = loctable->succ(key);  if (corr_succ.id == me.id)    r->is_owner = true;  else {    r->is_owner = false;    r->correct_owner = corr_succ;  }}voidOneHop::join(Args *args){  if (!alive()) return;  me.ip = ip();  me.id = ConsistentHash::ip2chid(me.ip);  OneHopObserver::Instance(NULL)->addnode(me); //jy  last_join = now();  //jy: get a random alive node from observer  while (!_wkn.ip || _wkn.ip == me.ip)    _wkn = OneHopObserver::Instance(NULL)->get_rand_alive_node();   if (ip() == OneHop::debug_node)     DEBUG(0) << now() << ":" << me.ip << ","<< printID(me.id) << " start to join  wkn " << _wkn.ip << endl;  loctable->add_node(me);  if (args && args->nget<uint>("first",0,10)==1) { //jy: a dirty hack, one hop does not like many nodes join at once    _join_complete = true;    delaycb(_stab_timer/2, &OneHop::stabilize, (void *)0);  }else if (_wkn.ip != ip()) {    IDMap fr;    fr.id = _wkn.id;    fr.ip = _wkn.ip;    join_leader(fr, fr, args);  }  else {     _join_complete = true;    joins++;    num_nodes++;    stabilize((void *)0);    publish((void *)0);  }  //who is my successor?  if (alive()) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -