⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 chord.c

📁 比较权威的p2p仿真软件
💻 C
📖 第 1 页 / 共 5 页
字号:
  }  Topology *t = Network::Instance()->gettopology();  CDEBUG(2) << " next_recurs key " << args->ipkey << "," << printID(args->key)    << "arrived pathsz " << ret->path.size() <<" src "<< args->src.ip << endl;  while (1) {    if (!alive()) {      ret->v.clear();      ret->correct = false;      ret->finish_time = now();      ret->lasthop.ip = 0;      ret->nexthop = me;      return;    }    succs = loctable->succs(me.id+1,_nsucc,LOC_HEALTHY);    if (succs.size() == 0) {      //lookup failed      //XXX do i need to backtrack?     ret->v.clear();     vector<IDMap> tmp = loctable->succs(me.id+1,_nsucc,LOC_ONCHECK);      //rejoin baby       if ((!_join_scheduled) && (tmp.size()==0)) {	_join_scheduled++;	delaycb(0, &Chord::join, (Args *)0);	CDEBUG(2) <<" next_recurs key "<< printID(args->key)	  << "no succ rejoin "<< endl;      }      ret->correct = false;      ret->lasthop = me;      if (args->type==TYPE_USER_LOOKUP && _recurs_direct) {	record_stat(me.ip,args->src.ip,args->type,ret->v.size(),0);	r = doRPC(args->src.ip, &Chord::final_recurs_hop, args, ret);	if (r) 	  record_stat(args->src.ip,me.ip,args->type,0);	else	  ret->finish_time=now(); //who cares about finishtime now      }      ret->nexthop = me;      return;    }    succ = succs[0];    next = loctable->next_hop(args->key);    if ((_ipkey && me.ip == args->ipkey) || 	(ConsistentHash::betweenrightincl(me.id, succs[0].id, args->key))) {      ret->v.clear();      sz = args->m < succs.size()? args->m : succs.size();      for (i = 0; i < sz; i++) {	ret->v.push_back(succs[i]);	if (ret->v.size() >= args->m) break;      }      if (args->type == TYPE_USER_LOOKUP) {	ret->correct = check_correctness(args->key,ret->v);	if (!ret->correct) 	  CDEBUG(3) << "next_recurs_handler key " << printID(args->key) 	    << " incorrect succ " << succs[0].ip << "," 	    << printID(succs[0].id) << endl;      } else	ret->correct = true;      ret->lasthop = me;      if (_recurs_direct) {	record_stat(me.ip,args->src.ip,args->type,ret->v.size(),0);	r = doRPC(args->src.ip, &Chord::final_recurs_hop, args, ret);	if (r) 	  record_stat(args->src.ip,me.ip,args->type,0);	else	  ret->finish_time=now(); //who cares about finishtime now      }      ret->nexthop = me; //the return path      return;    }else if (_stopearly_overshoot && ret->v.size()==0 && ConsistentHash::betweenrightincl(me.id, succs[succs.size()-1].id, args->key)) {      assert(static_sim);      uint start = 0;      for (i = 0; i < succs.size(); i++) {	if ( ConsistentHash::betweenrightincl(me.id, succs[i].id, args->key)) {	  ret->v.push_back(succs[i]);	  if (ret->v.size()>1)	    assert(ConsistentHash::between(args->key,ret->v[ret->v.size()-1].id,ret->v[0].id));	  if (!start) start = i;	}      }      if (ret->v.size() >= args->m) {	assert(ret->v.size() <= _nsucc);	ret->correct = check_correctness(args->key,ret->v);	assert(ret->correct);	if (_recurs_direct) {	  record_stat(me.ip,args->src.ip,args->type,1,0); 	  assert(ret->v.size() >= args->m && args->m >= 7);	  r = doRPC(args->src.ip, &Chord::final_recurs_hop, args, ret);	  if (r) 	    record_stat(args->src.ip,me.ip,args->type,0);	}	ret->nexthop = me;	return;      }      int currsz = ret->v.size();      Time min_lat = 1000000;      Time lat;      for (i = 0; i < succs.size(); i++) {	if ((currsz + i + 1)>= args->m) {	  lat = t->latency(me.ip, succs[i].ip);	  if (min_lat > lat) {	    min_lat = lat;	    next = succs[i];	  }	}      }      assert(min_lat < 1000000);    }else if (_stopearly_overshoot && ret->v.size() > 0) {      assert(_nsucc == succs.size());      assert(static_sim);      //handling overshoot, just append my succ list      uint start = 0;      if (me.ip != ret->v[ret->v.size()-1].ip) {	for (start = 0; start < succs.size(); start++) {	  if (succs[start].ip == ret->v[ret->v.size()-1].ip) {	    break;	  }	}	start++;      }      if (start > succs.size()) {	start = 0;	while (start < succs.size()) {	  if (ConsistentHash::between(me.id,succs[start].id,args->key) && ConsistentHash::between(args->key,succs[start].id, ret->v[ret->v.size()-1].id)) 	    break;	  start++;	}      }      IDMap tmp,tmp2,tmp3,tmp4;      for (i = start; i < succs.size(); i++) {	tmp = succs[i];	if (i>1) {	  tmp2 = succs[i-1];	}	tmp3 = ret->v[ret->v.size()-1];	tmp4 = ret->v[0];	ret->v.push_back(succs[i]);        if (ret->v.size()>1)	  assert(ConsistentHash::between(args->key,ret->v[ret->v.size()-1].id,ret->v[0].id));	if (ret->v.size() >= args->m) {	  assert(ret->v.size() <= _nsucc);	  ret->correct = check_correctness(args->key,ret->v);	  assert(ret->correct);	  if (_recurs_direct) {	    record_stat(me.ip,args->src.ip,args->type,1,0);	    assert(ret->v.size() >= args->m);	    r = doRPC(args->src.ip, &Chord::final_recurs_hop, args, ret);	    if (r)	      record_stat(args->src.ip,me.ip,args->type,0);	  }	  ret->nexthop = me;	  return;	}      }      assert(0);    } else {      if (ret->path.size() >= 30) {	printf("WARNING: path too long for key %qx m %d %s: ", args->key, args->m, ts());	for (uint i = 0; i < ret->path.size(); i++) {	  printf("(%u,%qx,%u) ", ret->path[i].n.ip, ret->path[i].n.id, ret->path[i].tout);	}	printf("\n");	if (_recurs_direct) {	  ret->finish_time = now(); //not correct, but what the heck	}	ret->correct = false;	ret->v.clear();	ret->nexthop = me;	return;      }     }    assert(_stopearly_overshoot|| (next.ip != me.ip && ConsistentHash::between(me.id, args->key, next.id)));    tmp.n = next;    tmp.tout = 0;    ret->path.push_back(tmp);    record_stat(me.ip,next.ip,args->type,1);    ret->nexthop = next;    ret->prevhop = me;    r = doRPC(next.ip, &Chord::next_recurs_handler, args, ret, TIMEOUT(me.ip,next.ip));    if (_learn && !_recurs_direct) {      if (ret->lasthop.ip) 	learn_info(ret->lasthop);      for (uint x = 0; x < ret->v.size(); x++) 	learn_info(ret->v[x]);    }    if (!_recurs_direct && (!alive())) {      CDEBUG(3) << " next_recurs_handler lost key " << printID(args->key)<<endl;      ret->lasthop.ip = 0;      ret->v.clear();      ret->nexthop = me;      ret->correct = false;      if (!ret->finish_time)	ret->finish_time = now();      return;    }    if (r) {      if (!_recurs_direct) {	record_stat(next.ip,me.ip,args->type,ret->v.size());      }else{	record_stat(next.ip,me.ip,args->type,0);      }      if (!static_sim) loctable->update_ifexists(ret->nexthop);       ret->nexthop = me;      return;    }else{      CDEBUG(3) << "next_recurs_handler key " << printID(args->key) 	<< " nexthop " << next.ip << "," << printID(next.id) << endl;      if (vis) 	printf ("vis %llu delete %16qx %16qx succ %u,%qx\n", now (), me.id, next.id, succ.ip,succ.id);      ret->path[ret->path.size()-1].tout = 1;            //do a long check to see if next hop is really dead      IDMap replacement;      if ((!_learn) || (!replace_node(next,replacement))) {	int check = loctable->add_check(next);	if (check == LOC_ONCHECK) {	  alert_args *tmp = New alert_args;	  tmp->n = next;	  tmp->dst = me.ip;	  delaycb(1, &Chord::alert_delete, tmp);	}      }    }  }}voidChord::null_handler (void *args, IDMap *ret) {  return;}voidChord::alert_delete(alert_args *aa){  if (aa->dst != me.ip)     record_stat(me.ip,aa->dst,TYPE_MISC,1);  doRPC(aa->dst, &Chord::alert_handler, aa, (void *)NULL, TIMEOUT(me.ip,aa->dst));  delete aa;}// Always attempt to go to the predecessor for the key regardless of mvoidChord::next_handler(next_args *args, next_ret *ret){  if (_learn)     learn_info(args->src);  ret->dst = me;  check_static_init();  /* don't trust other people's information about my dead neighbors coz of non-transitivity     however, in case this is a retried query, take it more seriously*/  /*  if ((args->retry) && (s >= 0)) {    for (s = 0; s < (int)succs.size(); s++) {      for (j = 0; j < (int)args->deadnodes.size(); j++)  {	if (succs[s].ip == args->deadnodes[j].ip) {	  alert_args *tmp = New alert_args;	  tmp->n = succs[s];	  tmp->dst = me.ip;	  delaycb(1, &Chord::alert_delete, tmp);	  break;	}      }      if (j >= (int) args->deadnodes.size()) {	break;      }    }  }  */  if (args->deadnodes.size() > 0) {    for (uint i = 0; i < args->deadnodes.size(); i++) {      int check = loctable->add_check(args->deadnodes[i]);      if (check == -1 || check == LOC_DEAD) {      } else {	alert_args *tmp = New alert_args;	tmp->n = args->deadnodes[i];	tmp->dst = me.ip;	delaycb(1, &Chord::alert_delete, tmp);      }    }  }  int s;  vector<IDMap> succs = loctable->succs(me.id+1, _nsucc, LOC_HEALTHY);  if (succs.size() > 0)     s = 0;  else    s = -1;  assert((!static_sim) || succs.size() >= _allfrag);  ret->v.clear();  ret->next.clear();  if (s < 0) {    ret->lastnode = me;    ret->done = true;    vector<IDMap> tmp = loctable->succs(me.id+1,_nsucc,LOC_ONCHECK);    CDEBUG(3) << "next_handler key " << printID(args->key)       << " failed due to newjoin deadsz " << args->deadnodes.size()       << endl;    ret->correct = false;    //rejoin baby    if ((!_join_scheduled) && (tmp.size()==0)){      _join_scheduled++;      delaycb(0, &Chord::join, (Args *)0);    }  } else if (ConsistentHash::betweenrightincl(me.id, succs[s].id, args->key)) {     //XXX: need to take care of < m nodes situation in future    for (int i = s; i < (int)succs.size(); i++) {      ret->v.push_back(succs[i]);      if (ret->v.size() >= args->m) break;    }    ret->lastnode = me;    ret->done = true;    if (args->type == TYPE_USER_LOOKUP) {      ret->correct = check_correctness(args->key,ret->v);      if (!ret->correct) 	CDEBUG(3) << "key " << printID(args->key) << "incorrect succ " 	  << succs[0].ip << "," << printID(succs[0].id) << endl;    }else      ret->correct = true; //i don't check correctness for non-user lookups  } else {    ret->done = false;    ret->next.clear();    ret->next = loctable->next_hops(args->key,args->alpha);  }}// External event that tells a node to contact the well-known node// and try to join.voidChord::join(Args *args){  if (static_sim) {    if ((args) && (!_inited))      notifyObservers((ObserverInfo *)"join");    _inited = true;    return;  }  if (args) {    me.ip = ip();    if (_random_id)      me.id = ConsistentHash::getRandID();    else       me.id = ConsistentHash::ip2chid(me.ip);    me.timestamp = 0;    loctable->init(me);    if (_learn)       learntable->init(me);    _last_join_time = now();    ChordObserver::Instance(NULL)->addnode(me);    notifyObservers((ObserverInfo *)"join");    _join_scheduled++;    // XXX: Thomer says: not necessary    // node()->set_alive(); //if args is NULL, it's an internal join    CDEBUG(1) << "start to join " << endl;  }else{    if (!alive()) {      _join_scheduled--;      return;    }    CDEBUG(2) << "rescheduled join " << endl;  }  if (vis) {    printf("vis %llu join %16qx\n", now (), me.id);  }  if (!_wkn.ip) {    assert(args);    _wkn.ip = args->nget<IPAddress>("wellknown");    assert (_wkn.ip);    _wkn.id = dynamic_cast<Chord *>(Network::Instance()->getnode(_wkn.ip))->id();  }  find_successors_args fa;  find_successors_ret fr;  //fa.key = me.id + 1;  fa.key = me.id - 1;  //try to get multiple successors in case some failed  assert(_nsucc > 3);  fa.m = _nsucc;  bool ok = failure_detect(_wkn, &Chord::find_successors_handler, &fa, &fr, TYPE_JOIN_LOOKUP,1,0);  assert(_wkn.ip == fr.dst.ip);  _wkn = fr.dst;  joins++;  if (ok) record_stat(_wkn.ip,me.ip,TYPE_JOIN_LOOKUP, fr.v.size());  if (!alive()) {    _join_scheduled--;    return;  }  if (!ok || fr.v.size() < 1) {    CDEBUG(2) << "join failed retry later" << endl;    _join_scheduled--;    if (!_join_scheduled) {      delaycb(200, &Chord::join, (Args *)0);      _join_scheduled++;    }    return;  }  assert (ok);  for (uint i = 0; i < fr.v.size(); i++) {    if (fr.v[i].ip != me.ip)       loctable->add_node(fr.v[i],true);    _inited = true;  }  if (fr.v.size()>1)     _last_succlist_stabilized = now();  CDEBUG(1) << "joined succ " << fr.v[0].ip << "," << printID(fr.v[0].id) << endl;   if (!_stab_basic_running) {    _stab_basic_running = true;    delaycb(0, &Chord::reschedule_basic_stabilizer, (void *) 0);  }else{    Chord::stabilize();  }  if ((loctable->size() < 2)  && (alive())) {    CDEBUG(2) << "after stabilize join failed" << endl;    _join_scheduled--;    if (!_join_scheduled) {      delaycb(200, &Chord::join, (Args *)0);      _join_scheduled++;      return;    }  }  _join_scheduled--;}voidChord::reschedule_basic_stabilizer(void *x){  assert(!static_sim);  if (!alive()) {    _stab_basic_running = false;    CDEBUG(3) << "node dead cancel stabilizing " << endl;    return;  }  _stab_basic_running = true;  if (_stab_basic_outstanding > 0) {    assert(0);  }else{    _stab_basic_outstanding++;    stabilize();    _stab_basic_outstanding--;    assert(_stab_basic_outstanding == 0);  }  delaycb(_stab_basic_timer, &Chord::reschedule_basic_stabilizer, (void *) 0);}// Which paper is this code from? -- PODC // stabilization is not run in one non-interruptable piece// after each possible yield point, check if the node is dead.voidChord::stabilize(){  assert(!static_sim);  if (!_inited) return;  IDMap pred1 = loctable->pred(me.id-1, LOC_ONCHECK);  IDMap succ1 = loctable->succ(me.id+1, LOC_ONCHECK);  vector<IDMap> succs = loctable->succs(me.id+1, _nsucc, LOC_ONCHECK);  if (!succ1.ip) return;  CDEBUG(3) << "chord_stabilize start pred " << pred1.ip << ","     << printID(pred1.id) << "succ " << succ1.ip << ","     << printID(succ1.id) << endl;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -