⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 accordion.c

📁 P2P模拟器
💻 C
📖 第 1 页 / 共 4 页
字号:
  if (t) {    if (lr->v.size() == 0) {      ADEBUG(2) << "done lookup key " << printID(la->key) << "from " 	<< la->from.ip << "," << printID(la->from.id) << "failed " << endl;      record_lookup_stat(_me.ip, la->from.ip, now()-t, false,false, 	  la->hops, la->to_num, la->to_lat);    }else{      bool b = check_pred_correctness(la->key, la->from);      ADEBUG(2) << "done lookup key " << printID(la->key) << "from "       << la->from.ip << "," << printID(la->from.id)       << "succ " << lr->v.size() << " " << (lr->v.size()>0?lr->v[0].ip:0) << ","       << printID(lr->v.size()>0?lr->v[0].id:0) << "best " <<       2*Network::Instance()->gettopology()->latency(_me.ip, la->from.ip) << " lat "       << now()-t << " hops " << la->hops << " timeouts " << la->to_num       << " correct? " << (b?1:0) << endl;      if (now()-t > 4000) {	record_lookup_stat(_me.ip, la->from.ip, now()-t, false,	  false, la->hops, la->to_num, la->to_lat);      }else{	record_lookup_stat(_me.ip, la->from.ip, now()-t, true,	  b, la->hops, la->to_num, la->to_lat);      }      for (uint i = 0; i < lr->v.size();i++)	loctable->add_node(lr->v[i]);    }    _outstanding_lookups.remove(la->key);  }}voidAccordion::next_recurs(lookup_args *la, lookup_ret *lr){  IDMap succ = loctable->succ(_me.id+1);  IDMap pred = loctable->pred(_me.id-1);  //update my alivetime  //XXX bug??  la->nexthop.alivetime = now() - _last_joined_time;  //learn the src node if this query  la->src.timestamp = now();  loctable->add_node(la->src);  la->from.timestamp = now();  loctable->add_node(la->from);  if (lr && la->overshoot) {    if (_rate_queue->very_critical() || _rate_queue->critical())      la->learnsz = 1;    if (!_fixed_stab_int)      lr->v = loctable->get_closest_in_gap(la->learnsz, _me.id, la->overshoot, la->from, 20*80*1000/_bw_overhead,la->timeout?la->timeout:est_timeout(0.1));    else      lr->v = loctable->get_closest_in_gap(la->learnsz, _me.id,la->overshoot, la->from, 20*_fixed_stab_int,_fixed_stab_to);    if (!lr->v.size()) {      if (!_rate_queue->critical())	lr->v = loctable->succs(_me.id+1,la->learnsz);      else	lr->v.clear();      lr->is_succ = true;    }else{      lr->is_succ = false;    }  }  //if i have forwarded pkts for this key  //and the packet is droppable  if (_forwarded.find(la->src.id | la->key)) {    ADEBUG(3) << "next_recurs key " << printID(la->key) << "src " <<       la->src.ip << " ori " << la->ori.ip << " from " << la->from.ip       << " forwarded before no_drop? " << (la->no_drop?1:0) << endl;    if (!la->no_drop) return;  }else    _forwarded.insert(la->src.id | la->key, now());  //for adjusting parallelisms in the next discrete interval  if (now() >= _next_adjust)    adjust_parallelism();  _lookup_times++;  if (!succ.ip || succ.ip == _me.ip) {    ADEBUG(4) << "next_recurs not joined key " << printID(la->key)       << "failed" << endl;    if ((!_join_scheduled) || (now()-_join_scheduled) > 20000)      delaycb(0,&Accordion::join, (Args *)0); //join again    if (lr) {      lr->v.clear();      lr->done = false;    }    return;  }  if (succ.ip && ConsistentHash::between(_me.id,succ.id,la->key)) {    lookup_args *lla = New lookup_args;    bcopy(la,lla,sizeof(lookup_args));    lla->from = _me;    lla->from.alivetime = now()-_last_joined_time;    lla->nexthop = lla->src;    lookup_ret *llr = New lookup_ret;    llr->done = true;    llr->v = loctable->succs(_me.id+1,la->m);    ADEBUG(3) << "next_recurs key " << printID(la->key) << "src " <<       la->src.ip << " ori " << la->ori.ip << " from " << la->from.ip       << " no_drop? " << (la->no_drop?1:0) << " done succ "       << succ.ip << "," << printID(succ.id)       << " quota " << _rate_queue->quota() << " qsz " << _rate_queue->size()       << " hops " << lla->hops << endl;    _rate_queue->do_rpc(lla->src.ip, &Accordion::donelookup_handler,	&Accordion::null_cb, lla, llr, lla->ori.ip?0:1, lla->type, 	PKT_SZ(2*llr->v.size(),0), PKT_SZ(0,0),TIMEOUT(_me.ip,lla->src.ip));    return;  }  double ttt;  int para;  if (!_fixed_stab_int) {    para = (_rate_queue->quota()+_burst_sz)/(2*PKT_OVERHEAD+ 8*_learn_num);    if (!la->no_drop && _rate_queue->critical())      para = 1;    if (para > _parallelism)      para = _parallelism;    else if (para > la->parallelism)      para = la->parallelism;    else if (para <= 0)      para = 1;    ttt = para > 1? est_timeout((exp(log(0.1)/(double)para))):est_timeout(0.1);  }else{    para = _parallelism;    ttt = _fixed_lookup_to;  }  //vector<IDMap> nexthops = loctable->preds(la->key, para, LOC_HEALTHY, ttt);  vector<IDMap> nexthops = loctable->next_close_hops(la->key, para, _me,ttt);  if (_fixed_stab_int)    ttt = _fixed_stab_to;  /*  else if (ttt > 0.5)    ttt = est_timeout(0.5);    */  uint nsz = nexthops.size();  if (((nsz == 0) || (nexthops[0].ip == _me.ip)) && succ.ip) {    fprintf(stderr,"%llu para %u _para %u shit! %u,%qx key %qx succ %qx\n",now(), para, _parallelism, _me.ip,_me.id, la->key, succ.id);    abort();  }  ADEBUG(3) << "next_recurs " << printID(la->key) << " para " << para << " nsz " << nsz << ": " << print_succs(nexthops) << endl;  IDMap overshoot = loctable->succ(la->key);  bool sent_success;  uint i;  for (i = 0; i < nexthops.size(); i++) {    if (!ConsistentHash::between(_me.id,la->key,nexthops[i].id))      break;    lookup_args *lla = New lookup_args;    bcopy(la,lla,sizeof(lookup_args));    if ((la->no_drop) && i == 0) {      lla->no_drop = true;    } else {      lla->no_drop = false;    }    if (_rate_queue->critical())      lla->learnsz = 1;    lla->timeout = (Time) ttt;    lla->hops = la->hops+1;    lla->nexthop = nexthops[i];    lla->from = _me;    //lla->overshoot = overshoot.id;    lla->overshoot = (i>=1)?nexthops[i-1].id:la->key;    lookup_ret *llr = New lookup_ret;    llr->v.clear();    ADEBUG(3) << "next_recurs key " << printID(la->key) << " quota " << _rate_queue->quota() << " qsz "       << _rate_queue->size() << " locsz " << loctable->size(LOC_HEALTHY, ttt)       << " livesz " << loctable->live_size(ttt)       << " src " << la->src.ip << " ori " << la->ori.ip << " from " << la->from.ip <<       " forward to next " << nexthops[i].ip << "," << printID(nexthops[i].id)  << ","       << now()-nexthops[i].timestamp << "," << nexthops[i].alivetime << " to "       << (double)nexthops[i].alivetime/(double)(nexthops[i].alivetime+ now()-nexthops[i].timestamp)      << " hops " << lla->hops       << " est_to " << ttt << " est_sz " << _stat.size()       << " nsz " << nsz << " i " << i << " nodrop " << lla->no_drop << endl;    if (nexthops[i].ip == _me.ip)  {      fprintf(stderr,"what?! loop?!!!\n");      exit(3);    }    sent_success = _rate_queue->do_rpc(nexthops[i].ip, &Accordion::next_recurs,	&Accordion::next_recurs_cb, lla,llr, lla->ori.ip?0:(lla->no_drop?1:3), lla->type,	PKT_SZ(1,0), PKT_SZ(2*lla->learnsz,0),TIMEOUT(_me.ip,nexthops[i].ip));    if (!sent_success) break;  }  if ((la->no_drop) && (i>0))    _forwarded_nodrop.insert(la->src.id | la->key, (uint)i);}intAccordion::next_recurs_cb(bool b, lookup_args *la, lookup_ret *lr){  int ret_sz = 0;  if (alive()) {    IDMap succ = loctable->succ(_me.id+1);    Time delta = now()-la->nexthop.timestamp;    if (delta < la->nexthop.alivetime && delta > 5000)      add_stat((double)(la->nexthop.alivetime-delta)/(double)(la->nexthop.alivetime), b);    la->nexthop.timestamp = now();    if (b && (la->nexthop.ip!=_me.ip)){      if ((lr->is_succ) && (lr->v.size() > 0)){	loctable->update_ifexists(la->nexthop,true);	vector<IDMap> oldlist = loctable->between(la->nexthop.id+1,lr->v[lr->v.size()-1].id);	consolidate_succ_list(la->nexthop,oldlist,lr->v,false);	vector<IDMap> newlist = loctable->between(la->nexthop.id+1,lr->v[lr->v.size()-1].id);	ADEBUG(4) << "next_recurs_cb After consolidate: " << print_succs(newlist) << endl;      } else {	loctable->update_ifexists(la->nexthop);	for (uint i = 0; i < lr->v.size(); i++)  {	  IDMap xx = loctable->succ(lr->v[i].id,LOC_HEALTHY);	  if (xx.ip == lr->v[i].ip && xx.timestamp < lr->v[i].timestamp && lr->v[i].timestamp - xx.timestamp > 5000)	    add_stat((double)xx.alivetime/(double)(lr->v[i].timestamp - xx.timestamp+xx.alivetime), true);	  loctable->add_node(lr->v[i]);	}      }      ADEBUG(4) << "next_recurs_cb key " << printID(la->key) << "src " 	<< la->src.ip << " ori " << la->ori.ip << " from " << la->nexthop.ip	<< "," << printID(la->nexthop.id) << " learnt " << lr->v.size() << ": " 	<< print_succs(lr->v) << " nodes is_succ " << (lr->is_succ?1:0) 	<< " overshoot " << printID(la->overshoot) << " locsz " 	<< loctable->size(LOC_HEALTHY,0.9) << " livesz " << loctable->live_size(0.9) << endl;      ret_sz = PKT_SZ(2*lr->v.size(),0);    } else {      ADEBUG(4) << "next_recurs_cb key " << printID(la->key) << "src " 	<< la->src.ip << " nexthop " << la->nexthop.ip 	<< " dead " << (b?0:1) << endl;      if (!b) 	loctable->del_node(la->nexthop);      ret_sz = 0;    }    uint outstanding = _forwarded_nodrop.find(la->src.id | la->key);    if (outstanding) {      if (b && la->no_drop) {	_forwarded_nodrop.remove(la->key|la->src.id);	ADEBUG(3) << "next_recurs_cb key " << printID(la->key) << "src " 	  << la->src.ip << " ori " << la->ori.ip << " from "	  << la->nexthop.ip << "," << printID(la->nexthop.id) << "ori " << 	  la->ori.ip << " successfully forwarded nodrop" << endl;      }else if (outstanding == 1) {	  //send again, to myself	  la->to_lat += TIMEOUT(_me.ip, la->nexthop.ip);	  la->to_num++;	  la->no_drop = true;	  _forwarded_nodrop.remove(la->key|la->src.id);	  la->parallelism = 1;	  ADEBUG(3) << "next_recurs_cb key " << printID(la->key) << "src " 	    << la->src.ip << " ori " << la->ori.ip << " from "	    << la->nexthop.ip << "," << printID(la->nexthop.id) << "ori " << 	    la->ori.ip << (b?" live":" dead") << " restransmit" << endl;	  next_recurs(la,lr);       } else {	 ADEBUG(3) << "next_recurs_cb key " << printID(la->key) << "src " 	   << la->src.ip << " ori " << la->ori.ip << " from "	   << la->nexthop.ip << "," << printID(la->nexthop.id) << "ori " << 	   la->ori.ip << (b?" live":" dead") << " outstanding " << (outstanding-1) << endl; 	 _forwarded_nodrop.insert(la->src.id|la->key,(outstanding-1));       }    }else {      if (succ.ip == la->nexthop.ip && (!b)) {	ADEBUG(3) << "next_recurs_cb key " << printID(la->key) << "src " 	  << la->src.ip << " ori " << la->ori.ip << " new succ emerged " << endl;	la->no_drop = true;	next_recurs(la,lr);      } else	ADEBUG(3) << "next_recurs_cb key " << printID(la->key) << "src " 	  << la->src.ip << " ori " << la->ori.ip << " from "	  << la->nexthop.ip << "," << printID(la->nexthop.id) << "ori " << 	  la->ori.ip << (b?" live":" dead") << " dont care" << (outstanding-1) << endl;     }  }  delete la;  if (lr) delete lr;  return ret_sz;}/* ------------- fix successor routines ----------------- */voidAccordion::stab_succ(void *x){  if (!alive()) {    _stab_basic_running = false;    return;  }  fix_pred(NULL);  _last_stab = now();  delaycb(_stab_basic_timer, &Accordion::stab_succ, (void *)0);}voidAccordion::fix_pred(void *a){  if (!alive()) return;  IDMap pred = loctable->pred(_me.id-1);  get_predsucc_args *gpa = New get_predsucc_args;  get_predsucc_ret *gpr = New get_predsucc_ret;  gpa->m = 1;  gpa->n = pred;  gpa->src = _me;  gpa->src.alivetime = now()-_last_joined_time;  gpr->v.clear();  ADEBUG(3) << " fix_pred " << pred.ip << "," << printID(pred.id)     << " quota " << _rate_queue->quota() << endl;  _rate_queue->do_rpc(pred.ip, &Accordion::get_predsucc_handler,      &Accordion::fix_pred_cb, gpa, gpr, 0, TYPE_FIXPRED_UP, PKT_SZ(0,1), PKT_SZ(2,1),      TIMEOUT(_me.ip,pred.ip));}intAccordion::fix_pred_cb(bool b, get_predsucc_args *gpa, get_predsucc_ret *gpr){  int ret_sz = 0;  if (alive()) {    gpa->n.timestamp = now();    ADEBUG(4) << "fix_pred_cb pred " << gpa->n.ip << (b?" alive":" dead") << endl;    if (b) {      ret_sz = PKT_SZ(2,0);      loctable->update_ifexists(gpa->n);      if (gpr->v.size()>0) 	loctable->add_node(gpr->v[0]);    } else {      loctable->del_node(gpa->n,true);    }     delaycb(10000,&Accordion::fix_succ,(void*)0);  }  delete gpa;  delete gpr;  return ret_sz;}void Accordion::fix_succ(void *a){  if (!alive()) return;  IDMap succ = loctable->succ(_me.id+1, LOC_DEAD-1);  if (succ.ip == 0) {    ADEBUG(1) << "fix_succ locsz " << loctable->size()       << " reschedule join" << endl;    if ((!_join_scheduled) || (now()-_join_scheduled) > 20000)      delaycb(200, &Accordion::join, (Args *)0);    return;  }  get_predsucc_args *gpa = New get_predsucc_args;  get_predsucc_ret *gpr = New get_predsucc_ret;  gpr->v.clear();  gpa->n = succ;  gpa->src = _me;  gpa->src.alivetime = now()-_last_joined_time;  vector<IDMap> scs = loctable->succs(_me.id+1,_nsucc);  if (scs.size() > _nsucc/2)     gpa->m = 1;  else    gpa->m= _nsucc;  ADEBUG(2) << "fix_succ succ " << succ.ip << "," << printID(succ.id) << endl;  _rate_queue->do_rpc(succ.ip, &Accordion::get_predsucc_handler,	&Accordion::fix_succ_cb, gpa,gpr, 0, TYPE_FIXSUCC_UP, PKT_SZ(0,1), PKT_SZ(2*gpa->m,0),	TIMEOUT(_me.ip,succ.ip));}voidAccordion::get_predsucc_handler(get_predsucc_args *gpa,     get_predsucc_ret *gpr){  if (gpa->n.ip == _me.ip)     gpa->n.alivetime = now()-_last_joined_time;  else    abort();  gpr->pred = loctable->pred(_me.id-1);  if (gpa->m)    gpr->v = loctable->succs(_me.id+1,gpa->m);  gpa->src.timestamp = now();  loctable->add_node(gpa->src);}intAccordion::fix_succ_cb(bool b, get_predsucc_args *gpa, get_predsucc_ret *gpr){  int ret_sz = 0;  if (alive()) {    vector<IDMap> scs = loctable->succs(_me.id + 1, _nsucc);    ADEBUG(3) << "fix_succ_cb get " << gpr->v.size() << " succs, old succ " << gpa->n.ip << ","       << printID(gpa->n.id) << (b?" alive":" dead")       << " succsz " << scs.size() << "(" << print_succs(scs)       << ")" << endl;    gpa->n.timestamp = now();    if (b) {      ret_sz = PKT_SZ(1+2*gpr->v.size(),0);      IDMap succ = loctable->succ(_me.id+1);      loctable->update_ifexists(gpa->n);      if (gpr->pred.ip == _me.ip) {      }else if (gpr->pred.ip != _me.ip) {	loctable->add_node(gpr->pred);	IDMap newsucc = loctable->succ(_me.id+1);      }      consolidate_succ_list(gpa->n,scs,gpr->v);      vector<IDMap> newscs = loctable->succs(_me.id+1,100);      if (newscs.size() > 0) {	_est_n = ((ConsistentHash::CHID)-1)/ConsistentHash::distance(_me.id,newscs[newscs.size()-1].id);	_est_n = newscs.size()*_est_n;      }      ADEBUG(3) << "fix_succ_cb pred " << gpr->pred.ip << " new succ " << (newscs.size()>0?newscs[0].ip:0) << "," << 	(newscs.size()>0?printID(newscs[0].id):"??") << " succsz " << newscs.size() << "(" <<	print_succs(newscs) << ")" << " retsz " << ret_sz << " newsz " << gpr->v.size() <<  " est_n " << _est_n << endl;      //delaycb(200,&Accordion::fix_pred,(void *)0);    } else {      loctable->del_node(gpa->n); //XXX: don't delete after one try?      delaycb(200,&Accordion::fix_succ, (void *)0);    }  }else{    _stab_basic_running = false;  }  delete gpa;  delete gpr;  return ret_sz;}voidAccordion::consolidate_succ_list(IDMap n, vector<IDMap> oldlist, vector<IDMap> newlist, bool is_succ){  IDMap sss;  IDMap mee;  uint oldi = 0, newi = 0;  if (is_succ) {    while (oldi < oldlist.size()) {      if (oldlist[oldi].ip == n.ip)	break;      oldi++;    }    oldi++;    mee = _me;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -