⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 accordion.c

📁 P2P模拟器
💻 C
📖 第 1 页 / 共 4 页
字号:
  for (HashMap<ConsistentHash::CHID, Time>::iterator i = _dead.begin();            i != _dead.end(); ++i) {    list<IDMap>* s = (*i).value();    if (s)  delete s;  }  */}/* ------------------------ lookup ----------------------------------*/voidAccordion::lookup(Args *args){  IDMap succ = loctable->succ(_me.id+1);  if (!succ.ip) {    if (!_join_scheduled || (now()-_join_scheduled) > 20000)       delaycb(0,&Accordion::join,(Args *)0);    ADEBUG(2) << "lookup key failed not yet joined" << endl;    record_lookup_stat(_me.ip, _me.ip, 0, false, false, 0, 0, 0);    return;  }  lookup_args la;  lookup_ret lr;  bzero(&la,sizeof(lookup_args));  la.key = args->nget<ConsistentHash::CHID>("key",0,16);  la.no_drop = true;  la.nexthop = _me;  la.from = _me;  la.from.alivetime = now()-_last_joined_time;  la.src = _me;  la.src.alivetime = now()-_last_joined_time;  la.src.timestamp = now();  la.ori.ip = 0;  la.m = 1;  la.parallelism = _parallelism;  la.type = TYPE_USER_LOOKUP;  la.learnsz = _learn_num;  la.deadnodes.clear();  lr.done = false;  lr.v.clear();  lr.is_succ = false;  _outstanding_lookups.insert(la.key, now());  ADEBUG(2) << "start lookup key " << printID(la.key) << endl;  if (_recurs)     next_recurs(&la,NULL);  else     next_iter(&la,&lr);}/* ------------------------ lookup (iterative) ------------------------*/voidAccordion::next_iter(lookup_args *la, lookup_ret *lr){  if (now() >= _next_adjust)    adjust_parallelism();  IDMap succ = loctable->succ(_me.id+1);  if (ConsistentHash::between(_me.id,succ.id,la->key)) {    la->from = _me;    la->nexthop = _me;    lr->v.push_back(succ);    lr->done = true;    donelookup_handler(la,lr);    return;  }else if (lr->done) {        _progress.remove(la->key);    la->nexthop = _me;    donelookup_handler(la,lr);    return;  }  double ttt;  int para;  if (!_fixed_stab_int) {    //calculate the parallelism needed    para = (_rate_queue->quota()+_burst_sz)/(2*PKT_OVERHEAD+ 8*_learn_num);   // para = para/est_hops;    if (para > _parallelism)      para = _parallelism;    else if (para > la->parallelism)      para = la->parallelism;    else if (para <= 0)      para = 1;    ttt = para > 1? (1-(exp(log(0.1)/(double)para))):0.9;  } else {    para = _parallelism;    ttt = _fixed_lookup_to;  }  vector<IDMap> nexthops = loctable->preds(la->key, para, LOC_HEALTHY, ttt);  ConsistentHash::CHID mostprog = _progress.find(la->key);  list<IDMap> *s = _sent.find(la->key);  if (!s) {    s = New list<IDMap>;    s->clear();    _sent.insert(la->key,s);  }  list<IDMap> *d = _dead.find(la->key);  if (!d) {    d = New list<IDMap>;    d->clear();    _dead.insert(la->key,d);  }  uint sentout = 0;  uint i=0,j=0;  IDMap nh;  bool seen;  while (sentout < para) {    if (i < nexthops.size())       nh = nexthops[i++];    else if (lr && j < lr->v.size())       nh = lr->v[j++];    else      break;    if (!mostprog || ConsistentHash::between(mostprog,la->key,nh.id)) {      seen = false;      list<IDMap>::iterator li;      for (li = s->begin(); li != s->end(); ++li) {	if ((*li).ip == nh.ip) {	  seen = true;	  break;	}else if (ConsistentHash::between((*li).id,la->key,nh.id)) {	  break;	}      }      if (!seen) {	s->insert(li,nh);	lookup_args *lla = New lookup_args;	lla->key = la->key;	lla->no_drop = la->no_drop;        lla->from = la->from;	lla->src = la->src;	lla->ori = la->ori;	lla->m = la->m;	lla->parallelism = la->parallelism;	lla->type = la->type;	lla->learnsz = la->learnsz;	lla->to_num = la->to_num;	lla->to_lat = la->to_lat;	lla->timeout = (Time) ttt;	lla->from.alivetime = now() - _last_joined_time;	lla->prevhop = la->nexthop;	lla->nexthop = nh;	lla->hops = la->hops+1;	lla->deadnodes.clear();	for (list<IDMap>::iterator di = d->begin(); di != d->end(); ++di) {	  if (ConsistentHash::between(nh.id,la->key,(*di).id))	    lla->deadnodes.push_back((*di));	  else	    break;	}	lookup_ret *llr = New lookup_ret;	llr->is_succ = false;	llr->done = false;	llr->v.clear();	ADEBUG(4) << " key moha " << printID(la->key) << " to " << nh.ip 	  << "," << printID(nh.id) << " dead " 	  << (lla->deadnodes.size()?lla->deadnodes[0].ip:0) << endl;	_rate_queue->do_rpc(nh.ip, &Accordion::next,	    &Accordion::next_iter_cb, lla, llr, 0, TYPE_USER_LOOKUP, 	    PKT_SZ(1+2*lla->deadnodes.size(),0), PKT_SZ(0,0),TIMEOUT(_me.ip, nh.ip));	sentout++;      }    }  }  uint outstanding =  _forwarded_nodrop.find(la->key);  ADEBUG(4) << "next_iter key " << printID(la->key) << " mostprog "     << printID(mostprog) << " from " << la->nexthop.ip  << "," << printID(la->nexthop.id) << " quota " << _rate_queue->quota()   << "," << para << "," << _parallelism << "," << sentout<< "," << outstanding << endl;  if (!outstanding && !sentout) {    assert(nexthops.size());    nh = nexthops[0];    lookup_args *lla = New lookup_args;    lla->key = la->key;    lla->no_drop = la->no_drop;    lla->from = la->from;    lla->src = la->src;    lla->ori = la->ori;    lla->m = la->m;    lla->parallelism = la->parallelism;    lla->type = la->type;    lla->learnsz = la->learnsz;    lla->to_num = la->to_num;     lla->to_lat = la->to_lat;    lla->timeout = (Time) ttt;    lla->from.alivetime = now() - _last_joined_time;    lla->prevhop = la->nexthop;    lla->nexthop = nh;    lla->hops = la->hops+1;    lla->deadnodes.clear();    for (list<IDMap>::iterator di = d->begin(); di != d->end(); ++di) {      if (ConsistentHash::between(nh.id,la->key,(*di).id))	lla->deadnodes.push_back((*di));      else	break;    }    lookup_ret *llr = New lookup_ret;    llr->is_succ = false;    llr->done = false;    llr->v.clear();    ADEBUG(4) << " key resend " << printID(la->key) << " to " << nh.ip       << "," << printID(nh.id) << " dead "       << (lla->deadnodes.size()?lla->deadnodes[0].ip:0) << endl;    _rate_queue->do_rpc(nh.ip, &Accordion::next,	&Accordion::next_iter_cb, lla, llr, 0, TYPE_USER_LOOKUP, 	PKT_SZ(1+2*lla->deadnodes.size(),0), PKT_SZ(0,0),TIMEOUT(_me.ip, nh.ip));    sentout++;  }  _forwarded_nodrop.insert(la->key,outstanding+sentout);}voidAccordion::next(lookup_args *la, lookup_ret *lr){  loctable->update_ifexists(la->from,0);  la->nexthop.alivetime = now() - _last_joined_time;  for (uint i = 0; i < la->deadnodes.size(); i++)     loctable->del_node(la->deadnodes[i]);  IDMap succ = loctable->succ(_me.id+1);  if (ConsistentHash::between(_me.id,succ.id,la->key)) {    lr->done = true;    la->from = _me;    lr->v = loctable->succs(_me.id+1,la->m);  } else {    assert(la->nexthop.ip == _me.ip);    if (!_fixed_stab_int)       //lr->v = loctable->next_close_hops(la->key, la->learnsz, la->from, la->timeout);      lr->v = loctable->preds(la->key, la->learnsz, LOC_HEALTHY, la->timeout);    else      lr->v = loctable->preds(la->key, la->learnsz, 	  LOC_HEALTHY, _fixed_stab_to);  }  ADEBUG(4) << " KEY " << printID(la->key) << " from " << la->from.ip << " dead " <<       (la->deadnodes.size()?la->deadnodes[0].ip:0) << " next " << (lr->v.size()>0?lr->v[0].ip:0)       << "," << printID((lr->v.size()?lr->v[0].id:0)) <<       " succ " << succ.ip << "," << printID(succ.id) <<       " locsz " << loctable->live_size(la->timeout) << endl;}intAccordion::next_iter_cb(bool b, lookup_args *la, lookup_ret *lr){  int ret_sz = 0;  uint outstanding =  _forwarded_nodrop.find(la->key);  outstanding--;  if (alive()) {    Time delta = now()-la->nexthop.timestamp;    if (delta < la->nexthop.alivetime && delta > 5000)      add_stat((double)(la->nexthop.alivetime-delta)/(double)(la->nexthop.alivetime), b);    la->nexthop.timestamp = now();    if ((b) && (la->nexthop.ip!=_me.ip)) {      	loctable->update_ifexists(la->nexthop,0);	for (uint i = 0; i < lr->v.size(); i++)  	  loctable->add_node(lr->v[i]);      ADEBUG(4) << "next_iter_cb key " << printID(la->key) << "src " 	<< la->src.ip << " ori " << la->ori.ip << " from " << la->nexthop.ip	<< "," << printID(la->nexthop.id) << " learnsz " << la->learnsz << " learnt " << lr->v.size() << ": " 	<< print_succs(lr->v) << " outstanding " << outstanding << " locsz " 	<< loctable->size(LOC_HEALTHY,0.9) << " livesz " << loctable->live_size(0.9) << " done? " << (lr->done?1:0)<< endl;      ret_sz = PKT_SZ(2*lr->v.size(),0);      /*      if ((la->prevhop.ip!=_me.ip) && lr->v.size() > 0) {	alert_args *aa = New alert_args;	aa->v.clear();	aa->dn.ip = 0;	for (uint i = 0; i < lr->v.size(); i++) 	  aa->v.push_back(lr->v[i]);	_rate_queue->do_rpc(la->prevhop.ip, &Accordion::alert_nodes,	           &Accordion::alert_cb, aa, (lookup_ret *)NULL, 0, TYPE_USER_LOOKUP,		           PKT_SZ(2*lr->v.size(),0), PKT_SZ(0,0),TIMEOUT(_me.ip, la->prevhop.ip));      }      */    } else {      la->to_num++;      la->to_lat += TIMEOUT(_me.ip, la->nexthop.ip);      loctable->del_node(la->nexthop);      ret_sz = 0;      list<IDMap> *d = _dead.find(la->key);      assert(d);      if (d) {	list<IDMap>::iterator di;	for (di = d->begin(); di!=d->end();++di) {	  if (ConsistentHash::between((*di).id,la->key,la->nexthop.id)) {	    d->insert(di,la->nexthop);	    break;	  }	}	if (di == d->end())	  d->push_back(la->nexthop);      }      ADEBUG(4) << "next_iter_cb key " << printID(la->key) << "src " 	<< la->src.ip << " ori " << la->ori.ip << " from " << la->nexthop.ip	<< "," << printID(la->nexthop.id) << " DEAD " << (d?d->size():0) << endl;/*      if (la->prevhop.ip!=_me.ip) {	alert_args *aa = New alert_args;	aa->v.clear();	aa->dn = la->nexthop;	_rate_queue->do_rpc(la->prevhop.ip, &Accordion::alert_nodes,	    &Accordion::alert_cb, aa, (lookup_ret *)NULL, 0, TYPE_USER_LOOKUP,	    PKT_SZ(2,0), PKT_SZ(0,0),TIMEOUT(_me.ip, la->prevhop.ip));      }      */    }    Time t = _outstanding_lookups.find(la->key);    if (outstanding)      _forwarded_nodrop.insert(la->key,outstanding);    else      _forwarded_nodrop.remove(la->key);    if (t) {      ConsistentHash::CHID prog = _progress.find(la->key);      if (b && (!prog || ConsistentHash::between(prog,la->key,la->nexthop.id))) { 	_progress.insert(la->key,la->nexthop.id);	next_iter(la,lr);      }else if (!outstanding &&  _outstanding_lookups.find(la->key)) 	next_iter(la,lr);    }    outstanding = _forwarded_nodrop.find(la->key);    if (!outstanding)       alert_lookup_nodes(la->key,la->timeout);  }  delete la;  if (lr) delete lr;  return ret_sz;}voidAccordion::alert_lookup_nodes(ConsistentHash::CHID key, Time to){  list<IDMap> *s = _sent.find(key);  if (!s) return;  list<IDMap> *dd = _dead.find(key);  vector<IDMap> v;  v.clear();  while (s->size()) {    if (dd->size() && s->front().ip == dd->front().ip) {      s->pop_front();      dd->pop_front();    }else if (!dd->size() 	|| ConsistentHash::between(dd->front().id,key,s->front().id)) {      v.push_back(s->front());      s->pop_front();    }else       dd->pop_front();  }  //if (_rate_queue->critical()) return;  //vector<IDMap> v = loctable->preds(key, _learn_num, LOC_HEALTHY, to);  for (list<IDMap>::iterator i = s->begin(); i != s->end(); ++i) {    alert_args *la = New alert_args;    la->v.clear();    for (uint j = 0; j < v.size(); j++)       la->v.push_back(v[j]);    la->d.clear();    la->k = key;    la->src = _me;    for (list<IDMap>::iterator jj = dd->begin(); jj!=dd->end();++jj)      la->d.push_back(*jj);    _rate_queue->do_rpc((*i).ip, &Accordion::alert_nodes,	&Accordion::alert_cb, la, (lookup_ret *)NULL, 0, TYPE_USER_LOOKUP,	PKT_SZ(2*(la->v.size()+la->d.size()),0), 	PKT_SZ(0,0),	TIMEOUT(_me.ip, (*i).ip));   }  if (s) {    delete s;    _sent.remove(key);  }  if (dd) {    delete dd;    _dead.remove(key);  }}voidAccordion::alert_nodes(alert_args *la, lookup_ret *lr){  ADEBUG(4) << " alert_nodes key " << printID(la->k) << " "     << print_succs(la->v) << " " <<     la->src.ip << " dead " << print_succs(la->d) << endl;  for (uint i = 0; i < la->v.size(); i++) {    if (la->v[i].ip == 0)       ADEBUG(4) << " wierd " << endl;    loctable->add_node(la->v[i]);  }  for (uint i = 0; i < la->d.size(); i++)     loctable->del_node(la->d[i]);}intAccordion::alert_cb(bool b, alert_args *la, lookup_ret *lr){  if (la)    delete la;  return 0;}/* ------------------------ lookup (recursive) ----------------------- */voidAccordion::donelookup_handler(lookup_args *la, lookup_ret *lr){  if (la->nexthop.ip == _me.ip) {    la->nexthop.alivetime = now() - _last_joined_time;  }else    abort();  if (la->from.ip!=_me.ip) {    la->from.timestamp = now();    loctable->add_node(la->from);  }  if (la->ori.ip) {    lookup_args *lla = New lookup_args;    bcopy(la,lla,sizeof(lookup_args));    la->nexthop = la->ori;    lookup_ret *llr = New lookup_ret;    llr->v = lr->v;    if (lr->v.size() > 0) {      la->ori.timestamp = now();      loctable->add_node(la->ori);    }    _rate_queue->do_rpc(la->ori.ip, &Accordion::join_handler,	&Accordion::null_cb, lla, llr, 0, TYPE_JOIN_LOOKUP, 	PKT_SZ(2*llr->v.size(),0), PKT_SZ(0,0),TIMEOUT(_me.ip, la->ori.ip));     return;  };  assert(lr->done);  Time t = _outstanding_lookups.find(la->key);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -