⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 accordion.c

📁 P2P模拟器
💻 C
📖 第 1 页 / 共 4 页
字号:
  }else{    mee = n;  }  while (1) {    if (newi >= newlist.size())      break;    if (oldi >= oldlist.size()) {      if (is_succ)  { // i need to clean up some shit	if ((oldi-1>=0) && (oldlist.size()>0))	  sss = loctable->succ(oldlist[oldi-1].id);      }      while (newi < newlist.size()) {	if (is_succ) {	  while (newlist[newi].ip!=sss.ip && ConsistentHash::between(mee.id,newlist[newi].id,sss.id)) {	    sss.timestamp = now();	    loctable->del_node(sss);	    sss = loctable->succ(sss.id+1);	  }	}	loctable->add_node(newlist[newi],is_succ,false,0,0,true);	newi++;	oldi++;	if (is_succ) {	  sss = loctable->succ(newlist[newi-1].id+1);	}	if (is_succ && oldi == _nsucc) 	  break;      }      if (is_succ && oldi == _nsucc) 	loctable->last_succ(newlist[(newi-1)]);      while (newi < newlist.size()) {	loctable->add_node(newlist[newi],is_succ,false,0,0,true);	newi++;      }      break;    }    if (oldlist[oldi].ip == newlist[newi].ip) {      loctable->add_node(newlist[newi],is_succ,false,0,0,true);      newi++;      oldi++;    }else if (ConsistentHash::between(_me.id, newlist[newi].id, oldlist[oldi].id)) {      oldlist[oldi].timestamp = now();      loctable->del_node(oldlist[oldi]);//XXX the timestamp is not very correct      oldi++;    }else {      loctable->add_node(newlist[newi],is_succ,false,0,0,true);      newi++;    }  }  if (is_succ) {    vector<IDMap> updated = loctable->succs(_me.id+1,_nsucc);    ADEBUG(4) << "consolidate fix_succ : old (" <<print_succs(oldlist) << ") new: ("      << print_succs(newlist) << ") updated (" << print_succs(updated)      << ")" << endl;  }}/* ---------------------------- empty queue ------------------------------ */voidAccordion::empty_cb(void *x)  //wrapper function{  Accordion *c = (Accordion *)x;  return c->empty_queue(NULL);}voidAccordion::empty_queue(void *a) {  IDMap succ = loctable->succ(_me.id+1);  if (!succ.ip){    if (!_join_scheduled || (now()-_join_scheduled)>20000) {      ADEBUG(4) << "empty_queue locsz " << loctable->size() 	<< " reschedule join" << endl;      delaycb(0,&Accordion::join,(Args *)0);    }    return;  }  if (now()>=_next_adjust)    adjust_parallelism();  _empty_times++;  IDMap askwhom,pred, next;  askwhom.ip = _me.ip;  double oldest = 0.0;  uint op = 0;  double tt = _tt;  while ((pred.ip == _me.ip) || (!pred.ip)){    if (_fixed_stab_int)       //oldest = loctable->pred_biggest_gap(pred, next, 20*_fixed_stab_int, _tt);      op = loctable->sample_smallworld(_est_n, askwhom, pred, next, tt, _max_succ_gap);    else       //oldest = loctable->pred_biggest_gap(pred, next, 20*80*1000/_bw_overhead, _tt);      op  = loctable->sample_smallworld(_est_n, askwhom, pred,next, tt, _max_succ_gap);    tt = 1-((1-tt)/2.0);    if (tt > 0.9)       break;  }  if (askwhom.ip == _me.ip || !askwhom.ip) {    ADEBUG(4) << "nothing to learn oldest " << oldest << " locsz " << loctable->size() << endl;    return;  }  if ((askwhom.ip != pred.ip) && (!ConsistentHash::between(askwhom.id,next.id,pred.id) || askwhom.ip == next.ip))     fprintf(stderr,"%llu %u %u %u %u %u fuck!\n", now(), _me.ip, askwhom.ip, pred.ip, next.ip, ((Accordion *)Network::Instance()->getnode(askwhom.ip))->budget());  learn_args *la = New learn_args;  learn_ret *lr = New learn_ret;  Time to = _stab_basic_timer;  la->m = _learn_num;  la->timeout = (Time) _tt;  la->n = askwhom;  la->src = _me;  la->src.alivetime = now()-_last_joined_time;  la->start = pred;  la->end = next;  ADEBUG(2) << "empty_queue quota " << _rate_queue->quota() << " succsz " << loctable->succ_size()     << " locsz " << loctable->size() << " livesz "     << loctable->live_size() << " locsz_used "     << loctable->size(LOC_HEALTHY,_tt) << " livesz_used " << loctable->live_size(_tt)    << " learn from " << la->n.ip << ","     << printID(la->n.id) << " start " << la->start.ip << "," << printID(la->start.id) << " end " <<    la->end.ip << "," << printID(la->end.id) << " old " << (now()-la->n.timestamp)     << " para " << _parallelism << " est_tt " << _tt << " op " << op     << " statsz " << _stat.size() << " est_n " << _est_n << endl;  _rate_queue->do_rpc(askwhom.ip, &Accordion::learn_handler,       &Accordion::learn_cb, la, lr, 3, TYPE_FINGER_UP,       PKT_SZ(0,1), PKT_SZ(2*la->m,0),TIMEOUT(_me.ip,pred.ip));}voidAccordion::learn_handler(learn_args *la, learn_ret *lr){  la->src.timestamp = now();  loctable->add_node(la->src);  IDMap pred = loctable->pred(_me.id-1);  lr->is_succ = false;  if (la->n.ip == _me.ip)     la->n.alivetime = now()-_last_joined_time;  else    abort();  if (_rate_queue->very_critical()) {    IDMap succ = loctable->succ(_me.id+1);    lr->v.clear();    if (succ.ip)      lr->v.push_back(succ);    lr->is_succ = true;  }else {    if (la->n.ip == _me.ip) {      vector<IDMap> scs = loctable->succs(_me.id+1,_nsucc);      if (scs.size() && ConsistentHash::between(_me.id,scs[scs.size()-1].id,la->end.id)) {	lr->v = loctable->succs(_me.id+1,la->m);	lr->is_succ = true;      }    }    if (!lr->is_succ || la->n.ip!=_me.ip) {      lr->v = loctable->get_closest_in_gap(la->m, la->start.id,la->end.id, la->src, 20*80*1000/_bw_overhead, la->timeout);      if (lr->v.size() < la->m) {	lr->v = loctable->succs(la->start.id+1,la->m);      }    }  }  ADEBUG(4) << "learn_handler from src " << la->src.ip << " is_succ "     << lr->is_succ << " nodes " << print_succs(lr->v) << endl;}intAccordion::learn_cb(bool b, learn_args *la, learn_ret *lr){  uint ret_sz = 0;  if (alive()) {    Time delta = now()-la->n.timestamp;    if (delta < la->n.alivetime && delta > 5000)      add_stat((double)(la->n.alivetime-delta)/(double)(la->n.alivetime),b);    la->n.timestamp = now();    if (b) {      IDMap neighborsucc;      if (lr->is_succ) {	/*	if (lr->v.size()>0) 	  neighborsucc = lr->v[0];	else	  neighborsucc = la->end;	  */	loctable->update_ifexists(la->n,true);	if (lr->v.size()>0	    &&ConsistentHash::distance(la->n.id,lr->v[0].id) > _max_succ_gap) 	  _max_succ_gap = ConsistentHash::distance(la->n.id,lr->v[0].id);	//ConsistentHash::CHID gap = ConsistentHash::distance(la->n.id,neighborsucc.id);	//_max_succ_gap = gap;	if (lr->v.size() > 0) {	  vector<IDMap> oldlist = loctable->between(la->n.id+1,lr->v[lr->v.size()-1].id);	  consolidate_succ_list(la->n,oldlist,lr->v,false);	  if (p2psim_verbose) {	    vector<IDMap> newlist = loctable->between(la->n.id+1,lr->v[lr->v.size()-1].id);	    ADEBUG(4) << "learn_cb After consolidate " << la->n.ip << "," << la->n.alivetime << " : " << print_succs(newlist) << endl;	  }	}      } else {	if (!lr->v.size()) {	  fprintf(stderr,"%llu me %u from %u,%qx,%llu %qx:%qx\n",	      now(),_me.ip,la->n.ip,la->n.id,la->n.alivetime,la->start.id,la->end.id);	}else {	  neighborsucc = lr->v[0];	  loctable->update_ifexists(la->n);	  for (uint i = 0; i < lr->v.size(); i++)  {	    IDMap xx = loctable->succ(lr->v[i].id,LOC_HEALTHY);	    if (xx.ip == lr->v[i].ip && xx.timestamp < lr->v[i].timestamp)	      add_stat((double)xx.alivetime/(double)(lr->v[i].timestamp - xx.timestamp+xx.alivetime), true);	    loctable->add_node(lr->v[i]);	  }	}      }      ADEBUG(4) << "learn_cb quota " << _rate_queue->quota() << " locsz " 	<< loctable->size(LOC_HEALTHY) << " usedsz " << loctable->size(LOC_HEALTHY,la->timeout)	<<  " to " << la->timeout << " livesz " << loctable->live_size(la->timeout) 	<< " learn_cb " << la->m 	<< " from " << la->n.ip << ":" << la->end.ip << " " << printID(la->n.id) << ":" <<  printID(la->end.id)	<< " maxgap " << printID(_max_succ_gap) 	<< " is_succ " << (lr->is_succ?1:0) 	<< " sz " << lr->v.size() << " est_n " << _est_n << " ex " << neighborsucc.alivetime 	<< "," << now()-neighborsucc.timestamp << " : " << print_succs(lr->v) << endl;      ret_sz = PKT_SZ(2*lr->v.size()+1,0);    }else{      loctable->del_node(la->n); //XXX: should not delete a finger after one failure      ADEBUG(4) << " learn_cb quota " << _rate_queue->quota() << " node " << la->n.ip 	<< "," << printID(la->n.id)<< "," << (now()-la->n.timestamp) << "," << la->n.alivetime << "," <<	(double)(la->n.alivetime)/(double)(now()-la->n.timestamp+la->n.alivetime) << " dead " << (b?0:1)	<< " locsz " << loctable->size(LOC_HEALTHY,la->timeout) << " livesz " << loctable->live_size(la->timeout) << endl;    }    if (_rate_queue->empty() && !_fixed_stab_int)       delaycb(0, &Accordion::empty_queue, (void *)0);  }  delete la;  delete lr;  return ret_sz;}stringAccordion::print_succs(vector<IDMap> v){  char buf[1024];  char *b = buf;  b += sprintf(b," ");  Time nn = now();  for (uint i = 0; i < v.size(); i++) {    IDMap haha = v[i];    if (i == v.size()-1) {       b += sprintf(b,"%u(%llu:%llu) ", v[i].ip,v[i].alivetime,now()-v[i].timestamp);    }else{      b += sprintf(b,"%u(%llu:%llu) ", v[i].ip,v[i].alivetime,now()-v[i].timestamp);    }  }  return string(buf);}stringAccordion::printID(ConsistentHash::CHID id){  char buf[128];  sprintf(buf,"%qx ",id);  return string(buf);}bool Accordion::check_pred_correctness(ConsistentHash::CHID k, IDMap n){  IDMap tmp;  tmp.id = k;  uint pos = upper_bound(ids.begin(),ids.end(),tmp, IDMap::cmp) - ids.begin();  if (pos)    pos--;  else    pos = ids.size()-1;  if ((ids[pos].ip == n.ip) || (!Network::Instance()->alive(n.ip)))    return true;  else {    ADEBUG(4) << "key " << printID(k) << "wrong " << n.ip << ","       << printID(n.id) << "right " << ids[pos].ip << "," <<       printID(ids[pos].id) << endl;    return false;  }}voidAccordion::adjust_parallelism(){  if (!_fixed_stab_int) {    uint old_p = _parallelism;    if (_empty_times > _lookup_times)      _parallelism++;    else if (!_empty_times && _lookup_times)      _parallelism = _parallelism/2;    if (_parallelism < 1)      _parallelism = 1;    else if (_parallelism > _max_p)      _parallelism = _max_p;    unsigned long b = Node::collect_stat()?Node::get_out_bw_stat():0;    ADEBUG(4) << "adjust_parallelism from " << old_p      << " to " << _parallelism << " empty_times " << _empty_times << " lookup_times "       << _lookup_times << " bytes " << (b-_last_bytes) << " time "       << now()-_last_bytes_time << endl;    _last_bytes = b;    _last_bytes_time = now();    _empty_times = 0;    _lookup_times = 0;    while (_next_adjust < now())       _next_adjust += _adjust_interval;    _tt = (_parallelism>1)?(est_timeout((exp(log(0.1)/(double)_parallelism)))):est_timeout(0.1);  }  if ((Node::collect_stat()) && (now()-_last_joined_time>600000)){    uint rsz = loctable->size(LOC_HEALTHY,_tt);    rtable_sz.push_back(rsz);    if (rtable_sz.size() > 10000)      rtable_sz.erase(rtable_sz.begin());  }}voidAccordion::add_stat(double ti, bool live){  return; //XXX i do not think this is important  if ((_fixed_stab_int) || (ti <= 0.0) || (ti >= 1.0))    return;  Stat s;  s.alive = live;  s.ti = ti;  _stat.push_back(s);  if (_stat.size()> EST_TIMEOUT_SZ)     adjust_timeout();}doubleAccordion::est_timeout(double prob){  return (1.0-prob);  //return _calculated_prob[(uint)(10*prob)];}voidAccordion::adjust_timeout(){  if (_fixed_stab_int)    return;  uint ssz = _stat.size();  vector<double> new_prob;  sort_live.clear();  sort_dead.clear();  for (uint i = 0; i < _stat.size(); i++) {    if (_stat[i].alive)      sort_live.push_back(_stat[i].ti);    else      sort_dead.push_back(_stat[i].ti);  }  sort(sort_live.begin(),sort_live.end());  sort(sort_dead.begin(),sort_dead.end());  _stat.clear();  if (sort_dead.size() == 0 || sort_live.size() ==0)    return;  new_prob.resize(10);  for (uint i = 0; i < 10; i++)    new_prob[i] = 0.0;  uint lsz = sort_live.size();  uint dsz = sort_dead.size();  uint live = 0, dead = dsz;  uint i = 0, j = 0;  double tt;  while (1) {    if (i < lsz && j < dsz) {      if (sort_live[i] < sort_dead[j]) {	tt = sort_live[i];	live++;	i++;      }else{	tt = sort_dead[j];	dead--;	j++;      }    }else if (i < lsz) {      tt = sort_live[i];      live++;      i++;    }else if (j < dsz) {      tt = sort_dead[j];      dead--;      j++;    }else      break;    uint p = (uint)(10.0*dead/(double)(dead+live))+ 1;    if (p>=10)       p = 9;    if (new_prob[p] <= 0.0000001)      new_prob[p] = tt;  }  _last_calculated = now();  ADEBUG(4) << "estimated timeout " << _calculated_prob[1] << endl;  if (_calculated_prob[1] > 0.99) {    printf("sort_live %u:  ",lsz);    for (uint i = 0; i < sort_live.size(); i++)       printf("%.2f ",sort_live[i]);    printf("\n");    printf("sort_dead %u:  ",dsz);    for (uint i = 0; i < sort_dead.size(); i++)       printf("%.2f ",sort_dead[i]);    printf("\n");    printf("calculated prob : ");    for (uint i = 0; i < _calculated_prob.size();i++)       printf("%.2f ",_calculated_prob[i]);    printf("\n");    new_prob[1] = 0.95;  }  for (uint i = 0; i < 10; i++)     _calculated_prob[i] = 0.9*_calculated_prob[i] + 0.1*new_prob[i];}#include "p2psim/bighashmap.cc"

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -