⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 chord.c

📁 比较权威的p2p仿真软件
💻 C
📖 第 1 页 / 共 5 页
字号:
      //find a free rpc slot      uint ii = 0;      for (ii = 0; ii < parallel; ii++) {	if (rpcslots[(rpc_i+ii)%parallel]->free) {	  rpc_i = (rpc_i+ii)%parallel;	  p = rpcslots[rpc_i];	  p->free = false;	  break;	}      }      assert(ii<parallel);      p->link = h;      totalrpc++;      assert(h.to.ip > 0 && h.to.ip < 3000);      //recorded.push_back(h);      CDEBUG(2) << "key " << printID(key) << "sending to next hop node "<< h.to.ip	<< "," << printID(h.to.id) << endl;      record_stat(me.ip,h.to.ip,type,1+na.deadnodes.size(),0);      assert((h.to.ip == me.ip) || (h.to.ip!=h.from.ip));      rpc = asyncRPC(h.to.ip, &Chord::next_handler, &na, &(p->ret), TIMEOUT(me.ip,h.to.ip));      rpcset.insert(rpc);      resultmap[rpc] = rpc_i;      outstanding++;    }        assert(outstanding > 0);    //fill out all my allowed parallel connections, wait for response    before = now();    donerpc = rcvRPC(&rpcset, ok);    rpc_i = resultmap[donerpc];    assert(rpc_i>=0 && rpc_i<parallel);    if (a)       a->latency += (now()-before);    outstanding--;    reuse = rpcslots[rpc_i];    assert(!reuse->free);    reuse->free = true;    //count statistics about hop count and timeouts    num_hops[rpc_i]++;    if (!ok) {      num_timeouts[rpc_i]++;      time_timeouts[rpc_i]+=TIMEOUT(me.ip,reuse->link.to.ip);    }    if (!alive()) goto DONE;    if (ok) {      record_stat(reuse->link.to.ip,me.ip,type,reuse->ret.done?reuse->ret.next.size():reuse->ret.v.size());       if (reuse->link.from.ip == me.ip) {	assert(reuse->ret.dst.ip == reuse->link.to.ip);	loctable->update_ifexists(reuse->ret.dst);       }      CDEBUG(2) << "key " << printID(key) << "outstanding " << outstanding 	<< " deadsz " << na.deadnodes.size() << " from " << reuse->link.to.ip 	<< "," << printID(reuse->link.to.id) << "done? " << (reuse->ret.done?1:0)	<< " nextsz " << reuse->ret.next.size() << " savefinishedsz " 	<< savefinished.size() << endl;      if ((reuse->link.from.ip == me.ip) && (!static_sim))	loctable->update_ifexists(reuse->link.to);       if (reuse->ret.done) {	lastfinished = reuse->link;	goto DONE;//success      }      if (a && a->ipkey && reuse->link.to.ip == a->ipkey) {	lastfinished = reuse->link;	goto DONE;      }      //XXX: to be fixed, does not look like it will work for parallel lookup      if ((reuse->ret.next.size() == 0 ) && (outstanding == 0)) {//	  && reuse->link.to.ip == lastfinished.to.ip)  //this will fail coz i cannot mark this node as dead	lastfinished = reuse->link;	goto DONE;//failed      }            list<hop_info>::iterator iter = tasks.begin();      IDMap tmptmp;      for (uint i = 0; i < reuse->ret.next.size(); i++) {	tmptmp = reuse->ret.next[i];	if (asked.find(reuse->ret.next[i].ip) != asked.end()) 	  continue;	if (_learn)	  learn_info(reuse->ret.next[i]);	if (ConsistentHash::betweenrightincl(reuse->ret.next[i].id, key, lastfinished.to.id))	  continue;	h.from = reuse->link.to;	h.to = reuse->ret.next[i];	h.hop = reuse->link.hop + 1;	while (iter != tasks.end()) {	  if (ConsistentHash::between(iter->to.id, key, reuse->ret.next[i].id))	    break;	  iter++;	}	assert(h.to.ip != me.ip);	tasks.insert(iter, h);	asked[h.to.ip] = true;      }      // any improvement?      if (na.retry && lastfinished.to.ip == reuse->link.to.ip) 	if ((tasks.size() == 0) || ConsistentHash::betweenrightincl(tasks.front().to.id, key, lastfinished.to.id))	  goto DONE;      //insert into the history of finished rpcs      iter = savefinished.begin();      while (iter != savefinished.end()) {	if (ConsistentHash::betweenrightincl(me.id, iter->to.id, reuse->link.to.id)) 	  break;	iter++;      }      if (iter == savefinished.end() || reuse->link.to.ip != iter->to.ip) 	savefinished.insert(iter, reuse->link);      lastfinished = savefinished.back();      CDEBUG(2) << "key " << printID(key) << "from " << reuse->link.to.ip	<< "," << printID(reuse->link.to.id) << "next? " << 	(reuse->ret.next.size()>0?reuse->ret.next[0].ip:0) << ","	<< (reuse->ret.next.size()>0?printID(reuse->ret.next[0].id):0)	<< " task top " << (tasks.size()>0?tasks.front().to.ip:0)	<< "," << (tasks.size()>0?printID(tasks.front().to.id):0)	<< " tasksz " << tasks.size() << " lastfinished " 	<< lastfinished.to.ip << "," << printID(lastfinished.to.id) << endl;    } else {      if (reuse->link.to.ip == me.ip) {	//a very special wierd case, the node has gone down and up during the RPC	goto DONE;      }      CDEBUG(2) << "key " << printID(key) << "outstanding " << outstanding	<< " deadsz " << na.deadnodes.size() << " from " << reuse->link.to.ip	<< "," << printID(reuse->link.to.id) << "DEAD savefinishedsz " 	<< savefinished.size() << " lastfinished " << lastfinished.to.ip << endl;        if (reuse->link.from.ip == me.ip) {	if (_learn)	  to_be_replaced.push_back(reuse->link.to);      }      //notify      alert_args *aa = New alert_args;      aa->n = reuse->link.to;      record_stat(me.ip,reuse->link.from.ip,type,1);      assert(reuse->link.from.ip > 0 && reuse->link.from.ip < 3000);      rpc = asyncRPC(reuse->link.from.ip, &Chord::alert_handler, aa,(void *)NULL);      alertset.insert(rpc);      alertmap[rpc] = aa;      alertoutstanding++;      if (ConsistentHash::betweenleftincl(lastfinished.to.id, key, reuse->link.to.id)) {	na.deadnodes.push_back(reuse->link.to);      }      if (lastfinished.to.ip ==  reuse->link.to.ip) {	assert(savefinished.size()>0);	savefinished.pop_back();	CDEBUG(2) << "key " << printID(key) << "lastfinished " << lastfinished.to.ip	<< "," << printID(lastfinished.to.id) << "dead new last finished " << 	savefinished.back().to.ip << endl;	lastfinished = savefinished.back();      }    }    if (a && a->latency >= _max_lookup_time) {      goto DONE;    }  }DONE:  //get rid the crap nodes that i've learned  if (learntable) {    for (uint i = 0; i < na.deadnodes.size(); i++)       learntable->del_node(na.deadnodes[i],true);     IDMap replacement;    for (uint i = 0; i < to_be_replaced.size();i++)       replace_node(to_be_replaced[i],replacement);  }  //jesus christ i'm done, however, i need to clean up my shit  assert(reuse);  assert(rpc_i >=0 && rpc_i<parallel);  if (a) {    a->num_to += num_timeouts[rpc_i];    a->total_to += time_timeouts[rpc_i];    a->hops += num_hops[rpc_i];  }  if ((type == TYPE_USER_LOOKUP) && (alive())) {    /*    Topology *t = Network::Instance()->gettopology();    printf("%s lookup key %qx, hops %d totalrpc %d (lookup info hop %u, num_to %u, retry %u)\n", ts(), key, lastfinished.hop, totalrpc,a->hops,a->num_to,a->retrytimes);    printf("%s key %qx route: ", ts(), key);    IDMap last;    last = lastfinished.to;    for (uint i = recorded.size()-1; i >= 0; i--) {      if (last.ip == me.ip) break;      if (recorded[i].to.ip == last.ip) {	printf("(%u,%qx %llu) ", recorded[i].to.ip, recorded[i].to.id, (uint)2*t->latency(me.ip, last.ip));	last = recorded[i].from;      }    }    printf("\n");    */    if ((reuse->ret.done) && (reuse->ret.correct))      results = reuse->ret.v;  }else {    if (reuse->ret.done)       results = reuse->ret.v;  }  if (lasthop) {    *lasthop = reuse->link.to;  }  for (uint i = 0; i < outstanding; i++) {    donerpc = rcvRPC(&rpcset, ok);    rpc_i = resultmap[donerpc];    assert(rpc_i>=0 && rpc_i < parallel);    reuse = rpcslots[rpc_i];    if (ok)       record_stat(reuse->link.to.ip,me.ip,type,reuse->ret.done?reuse->ret.next.size():reuse->ret.v.size());  }  for (uint i = 0; i < parallel; i++) {    delete rpcslots[i];  }  for (uint i = 0;i < alertoutstanding; i++) {    donerpc = rcvRPC(&alertset, ok);    alert_args *aa = alertmap[donerpc];    if (ok)      record_stat(aa->n.ip,me.ip,type,0);    delete alertmap[donerpc];  }  return results;}/* the recursive query goes directly back to the sender. */voidChord::final_recurs_hop(next_recurs_args *args, next_recurs_ret *ret){  lookup_path tmp;  tmp.n = me;  tmp.tout = 0;  ret->path.push_back(tmp);  ret->finish_time = now();}vector<Chord::IDMap>Chord::find_successors_recurs(CHID key, uint m, uint type, IDMap *lasthop, lookup_args *a) {  next_recurs_args fa;  fa.key = key;  fa.type = type;  fa.m = m;  fa.src = me;  CDEBUG(3) << "find_successors_recurs start key " << printID(key) << endl;  //do the parallel recursive lookup thing  //doRPC(me.ip, &Chord::next_recurs_handler, args, ret);  hash_map<unsigned, next_recurs_ret*> resultmap;  next_recurs_ret *reuse = NULL;  next_recurs_ret *p = NULL;  uint outstanding, parallel;  bool ok;  Time before = now();  vector<IDMap> results;  results.clear();  lookup_path tmp;    unsigned rpc, donerpc;  RPCSet rpcset;  if (a) {    fa.ipkey = a->ipkey;    parallel = _parallel;  } else {    fa.ipkey = 0;    parallel = 1;  }  fa.src = me;  IDMap nexthop;  nexthop.id = key;  nexthop.ip = 0;  outstanding = 0;  assert(!_stopearly_overshoot || _parallel == 1); //i do not know how to do parallel lookup with stopearly and overshoot  while (1) {    if (!alive()) return results;    IDMap succ = loctable->succ(me.id+1,LOC_HEALTHY);    if (succ.ip == 0) {      if (!_join_scheduled) {	_join_scheduled++;	delaycb(0, &Chord::join, (Args *)0);	CDEBUG(2) << " find_successors_recurs key " << printID(key) 	  << "no succ " << endl;      }      if (lasthop) *lasthop = me;      return results;    }    while (outstanding < parallel) {      if (_stopearly_overshoot) 	nexthop = me;      else {	nexthop = loctable->next_hop(nexthop.id-1);	if(nexthop.ip == 0) {	  if (reuse) {	    delete reuse;	    reuse = NULL;	  }	  break;	}      }      if (reuse) {	p = reuse;	reuse = NULL;      }else {	p = New next_recurs_ret;	p->path.clear();      }      p->correct = false;      p->lasthop = me;      p->v.clear();      p->finish_time = 0;      if ((a && a->ipkey && me.ip == a->ipkey) || (ConsistentHash::between(me.id,succ.id,key))) {	if (fa.m == 1) {	  p->v.push_back(succ);	}else{	  p->v = loctable->succs(key,fa.m,LOC_HEALTHY);	}	p->correct = check_correctness(key,p->v);	p->finish_time = now();	reuse = p;	goto RECURS_DONE;      }      if (nexthop.ip!=me.ip) {	tmp.n = nexthop;	tmp.tout = 0;	p->path.push_back(tmp);      }      CDEBUG(2) << " key " << (a?a->ipkey:0) << "," << printID(key) 	<< "via nexthop " << nexthop.ip << "," << printID(nexthop.id) 	<< "outstanding " << outstanding << " parallel " << parallel << endl;      record_stat(me.ip,nexthop.ip,type,1);      p->nexthop = nexthop;      p->prevhop = me;      assert(!reuse);      rpc = asyncRPC(nexthop.ip, &Chord::next_recurs_handler, &fa, p, TIMEOUT(me.ip,nexthop.ip));      rpcset.insert(rpc);      resultmap[rpc] = p;      outstanding++;    }    assert(outstanding>0);    donerpc = rcvRPC(&rpcset, ok);    outstanding--;    reuse = resultmap[donerpc];    if (ok) {      loctable->update_ifexists(reuse->nexthop);      record_stat(reuse->nexthop.ip,me.ip,type,resultmap[donerpc]->v.size());      goto RECURS_DONE;    }else{      //do a long check to see if next hop is really dead      assert(reuse->path.size()>0);      IDMap n = reuse->path[reuse->path.size()-1].n;      CDEBUG(2) << " key " << (a?a->ipkey:0) << "," << printID(key) << "nexthop "	<< n.ip << "," << printID(n.id) << "failed outstanding " << outstanding <<endl;      reuse->path[reuse->path.size()-1].tout=1;      IDMap replacement;      //if ((!_learn) || (!replace_node(n,replacement))) {      int check = loctable->add_check(n);      if (check == LOC_ONCHECK) {	alert_args *tmp = New alert_args;	tmp->n = n;	tmp->dst = me.ip;	delaycb(1, &Chord::alert_delete, tmp);      }      replace_node(n,replacement);      //}     }  }RECURS_DONE:  assert(reuse);  if (lasthop) {    *lasthop = (reuse->lasthop);  }  CDEBUG(1)<<" key " << printID(key) <<"finished lasthop "<< reuse->lasthop.ip     <<","<< printID(reuse->lasthop.id) << "vsize " << reuse->v.size()     <<" correct? "<< (reuse->correct?1:0) <<" hops "<<reuse->path.size()<<endl;  if (a) {    IDMap prev = me;    if (_recurs_direct) {      assert(reuse->finish_time > 0);      a->latency += (reuse->finish_time-before);    }else{      a->latency += (now()-before);    }    a->hops += reuse->path.size();    for (uint j= 0; j< reuse->path.size(); j++) {      if (reuse->path[j].tout>0) {	a->num_to++;	a->total_to += TIMEOUT(prev.ip,reuse->path[j].n.ip);      }else{	prev = reuse->path[j].n;      }    }  }  if (!a || reuse->correct) {    results = reuse->v;  }else{    results.clear();  }  if (reuse->lasthop.ip && _learn)     learn_info(reuse->lasthop);  delete reuse;  //garbage collection  for (uint i = 0; i < outstanding; i++) {    donerpc = rcvRPC(&rpcset, ok);    if (ok) {      loctable->update_ifexists(resultmap[donerpc]->nexthop);      record_stat(resultmap[donerpc]->nexthop.ip,me.ip,type,resultmap[donerpc]->v.size());    }    if ((_learn) && (resultmap[donerpc]->lasthop.ip))      learn_info(resultmap[donerpc]->lasthop);    delete resultmap[donerpc];  }  return results;}char *Chord::print_path(vector<lookup_path> &p, char *tmp){  char *begin = tmp;  tmp += sprintf(tmp,"<");  for (uint i = 0; i < p.size(); i++) {    tmp += sprintf(tmp, " %u",p[i].n.ip);  }  tmp += sprintf(tmp," >");  return begin;}//XXX: in a dynamic environment, the current implementation has a //wierd mechanism for implementing timeout.//it sends an empty reply to the sender to indicate that //some node currently holding the packet has died//in a real implemenation, the sender should implement some kind of //timeout mechanism.voidChord::next_recurs_handler(next_recurs_args *args, next_recurs_ret *ret){  vector<IDMap> succs;  IDMap succ;  bool r;  lookup_path tmp;  IDMap next;  uint sz, i;  check_static_init();  assert(alive());  if (_learn) {    assert(args->src.ip>0 && args->src.ip < 100000);    learn_info(args->src);    if (ret->prevhop.ip!=args->src.ip)       learn_info(ret->prevhop);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -