⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 accordion.c

📁 chord 源码 http://pdos.csail.mit.edu/chord/
💻 C
📖 第 1 页 / 共 2 页
字号:
    strbuf tmpbuf;    for (unsigned xx = 0; xx < fs.size (); xx++)       tmpbuf << (fs[xx]->id ()>>144) << "," << fs[xx]->knownup () << "," << fs[xx]->age () << " ";    acctrace << (myID>>144) << ": key " << (ra->x>>144)       << " give prev hop " << (loc->id ()>>144) << " para " << (ra->succs_desired >> 24) << " info "       << fs.size () << ": " << tmpbuf << "\n";    fill_nodelistres (&res, fs);  }  ra->succs_desired &= 0x00ffffff;  sentinfo *s = sent[ra->routeid];  if (!s) {     vec<ptr<location> > cs = succs ();    if ((stopearly_ && betweenrightincl (myID, cs.back ()->id (), ra->x)) ||       (betweenrightincl(myID, cs.front ()->id (), ra->x))) {      while (betweenrightincl (myID, ra->x, cs.front ()->id ()))	cs.pop_front ();      if ((cs.size () >= ra->succs_desired) || betweenrightincl(myID, cs.front ()->id (), ra->x)) {	s = New sentinfo (ra->routeid, timenow, 1);	sent.insert (s);	lookups_++;	doaccroute_sendcomplete (ra,cs);      }    }  }  if (s || ((ra->retries & 0x80000000) && (bavail< 0))) { //well it's a duplicate or redundant lookup    acctrace << (myID>>144) << ": doaccroute key " << (ra->x>>144) << " seen " << (s?1:0) << " routeid "       << ra->routeid << " redun " << ((ra->retries & 0x80000000)?1:0) << " bavail " << bavail << "\n";  } else {    lookups_++;    //calculate parallelism     s = New sentinfo (ra->routeid, timenow, 1);    sent.insert (s);    unsigned pp = get_parallelism ();    vec<ptr<location> > nextlocs = fingers_->nexthops (ra->x, pp, s->tried);    s->tried.setsize (nextlocs.size ());    s->outstanding = nextlocs.size ();    int j = 0;    ptr<location> tmp;    for (unsigned i = 0; i < nextlocs.size(); i++) {      assert (nextlocs[i]->id () != myID);      doaccroute_sendroute (ra, nextlocs[i], i, nextlocs.size ());      s->tried[i] = nextlocs[i];      j = i;      while (j >0 && 	  betweenrightincl (myID, s->tried[j-1]->id (), s->tried[j]->id ())) {	tmp = s->tried[j-1];	s->tried[j-1] = s->tried[j];	s->tried[j] = tmp;	j--;      }    }    chordID xxx = nextlocs.size()?(nextlocs[0]->id ()):0;    acctrace << (myID>>144) << ": parallel " << pp << " " << nextlocs.size ()       << " bavail " << bavail << " key " << (ra->x>>144) << " best " << (xxx>>144)       << " knownup " << (nextlocs.size()?nextlocs[0]->knownup ():0)       << " age " <<  (nextlocs.size()?nextlocs[0]->age ():0) << "\n";  }  if (sbp)    sbp->reply (&res);  sbp = NULL;  return;}voidaccordion::doaccroute_sendroute (recroute_route_arg *ra, ptr<location> p, bool redun, unsigned para){  chord_node_wire me;  my_location ()->fill_node (me);  ptr<recroute_route_arg> nra = New refcounted<recroute_route_arg> ();  *nra = *ra;  nra->succs_desired |= (para << 24);  if (redun)    nra->retries |= 0x80000000;  nra->path.setsize (ra->path.size () + 1);  for (unsigned i = 0; i < ra->path.size (); i++)     nra->path[i] = ra->path[i];  nra->path[ra->path.size ()] = me;  chord_nodelistres *res = New chord_nodelistres (CHORD_OK);  long *seqnop = (long *)malloc (sizeof(long));  bytes_sent (BYTES_PER_LOOKUP + ra->path.size ()*BYTES_PER_ID);  long seqno = doRPC (p, accordion_program_1, ACCORDIONPROC_LOOKUP,      nra, res,      wrap (this, &accordion::accroute_hop_cb, seqnop, nra, p, res),      wrap (this, &accordion::accroute_hop_timeout_cb, seqnop, getusec ()/1000, nra, p));  *seqnop = seqno;  acctrace << (myID>>144) << ": doaccroute_sendroute key " << (ra->x>>144)     << " send to " << (p->id ()>>144) << " seqno " << seqno << "\n";}voidaccordion::accroute_hop_cb (long *seqp, ptr<recroute_route_arg> nra,				 ptr<location> p,				 chord_nodelistres *res,				 clnt_stat status){  sentinfo* ss = sent[nra->routeid];  if (!status && res->status == CHORD_OK) {    bytes_sent (res->resok->nlist.size () * BYTES_PER_ID);    if (ss)      ss->outstanding = 0;//i.e. no more unncessary retransmissions    vec<chord_node> nlist;    for (unsigned i = 0; i < res->resok->nlist.size (); i++)      nlist.push_back (make_chord_node (res->resok->nlist[i]));    fingers_->fill_gap_cb (p, nlist, (chordstat)0);        acctrace << (myID>>144) << ": accroute_hop_cb key " << (nra->x>>144)      << " routeid " << nra->routeid << " done forwarded to "       << (p->id ()>>144) << " knownup " << p->knownup () << " age " << p->age ()       <<" learnt " << nlist.size () << " seqno " << (*seqp) << "\n";    delete res;     res = NULL;    return;  }  delete res;}boolaccordion::accroute_hop_timeout_cb (long *seqp, u_int64_t t, ptr<recroute_route_arg> nra,				    ptr<location> p,				    chord_node n,				    int rexmit_number){  u_int64_t now = getusec ()/1000;  sentinfo* ss = sent[nra->routeid];  acctrace << (myID>>144) << ": accroute_hop_TIMEOUT_cb key " << (nra->x>>144) << " next "     << p->id () << " seqno " << (*seqp) << " rex " << rexmit_number << "\n";  if (rexmit_number == 0) {    fingers_->del_node (p->id ());     if (!ss || ss->outstanding == 1 || (!(nra->retries & 0x80000000))) {      nra->retries += (now - t);      vec<ptr<location> > nexts = fingers_->nexthops (nra->x, 1, ss->tried);      if (nexts.size () > 0 && nexts[0]->id () != myID) {	chord_nodelistres *nres = New chord_nodelistres (CHORD_OK);	bytes_sent (BYTES_PER_LOOKUP + nra->path.size ()*BYTES_PER_ID);	long *nseq = (long *)malloc (sizeof(long));	*nseq = doRPC (nexts[0], accordion_program_1, ACCORDIONPROC_LOOKUP,	      nra, nres,	      wrap (this, &accordion::accroute_hop_cb, nseq , nra, nexts[0], nres),	      wrap (this, &accordion::accroute_hop_timeout_cb, nseq , now, nra, nexts[0]));	acctrace << (myID>>144) << ": accroute_hop_timeout_cb key " << 	  (nra->x>>144) << " failed " << (p->id ()>>144)<< " resend next " 	  << (nexts[0]->id ()>>144) << " seq " << (*nseq) << " rex " 	  << rexmit_number << " routeid " << nra->routeid << " retries " 	  <<  (nra->retries& 0x7fffffff) << " really " << (now-t) << "\n";	return true;      }        vec<ptr<location> > cs = succs ();      chordID succid =  (cs.size ()>0?cs[0]->id ():0);      if ((stopearly_ && betweenrightincl (myID, cs.back ()->id (), nra->x)) || 	        (betweenrightincl(myID, cs.front ()->id (), nra->x))) {	while (betweenrightincl (myID, nra->x, cs.front ()->id ()))	  cs.pop_front ();	nra->succs_desired &= 0x00ffffff;	if (cs.size () >= nra->succs_desired) {	  doaccroute_sendcomplete (nra,cs);	  //make sure nra is not garbage collected	  acctrace << (myID>>144) << ": accroute_hop_timeout_cb key " << 	  (nra->x>>144) << " failed " << (p->id ()>>144)<< " completed rex "	  << rexmit_number << " routeid " << nra->routeid << "\n";	  return true;	}      }      acctrace << (myID>>144) << ": accroute_hop_timeuout_cb key " 	<< (nra->x>>144) << " succ " 	<< (succid>>144)	<< " SHOULD NOT HAVE HAPPEN\n";    }    acctrace << (myID>>144) << ": accroute_hop_timeout_cb key "       << (nra->x>>144) << " next " << (p->id ()>>144) << " rex " <<       rexmit_number << " out " << ss->outstanding << " redun? "       << ((nra->retries&0x80000000)?1:0) << " no retry\n";    if (ss)      ss->outstanding--;  } else       acctrace << (myID>>144) << ": accroute_hop_timeout_cb key " 	<< (nra->x>>144)<< " next " << (p->id ()>>144) << " rex " 	<< rexmit_number << " out " << ss->outstanding << " routeid " << 	nra->routeid << " failed dropped\n";  return true;}voidaccordion::docomplete (user_args *sbp, recroute_complete_arg *ca, ptr<location> src){  route_accordion *router = routers[ca->routeid];  sentinfo* ss = sent[ca->routeid];  if (ss)    ss->outstanding = 0;  if (!router) {    acctrace << (myID>>144) << ": docomplete: unknown routeid "       << ca->routeid << " from host " << (src->id ()>>144) << "\n";    sbp->reply (NULL);    return;  }  strbuf s;  s << (myID>>144)  << ": docomplete: key " << (router->key () >>144)    << " routeid " << ca->routeid << " num_succ " <<  ca->body.robody->successors.size ()    << " returned from " << (src->id ()>>144) << " retries: " << (ca->retries&0x7fffffff);  chord_node n;  for (size_t i = 0; i < ca->body.robody->successors.size (); i++) {    n = make_chord_node (ca->body.robody->successors[i]);    locations->insert (n);    if (i == 0)       s << " succ " << (n.x>>144);  }  acctrace << s << "\n";  routers.remove (router);  router->handle_complete (sbp, ca);}voidaccordion::doaccroute_sendcomplete ( recroute_route_arg *ra,				    const vec<ptr<location> > cs){  //this has a lot of copy-n-paste from recroute.C  chord_node_wire me;  my_location ()->fill_node (me);  ptr<recroute_complete_arg> ca = New refcounted<recroute_complete_arg> ();  ca->body.set_status ((recroute_complete_stat)CHORD_OK);  ca->routeid = ra->routeid;    ca->path.setsize (ra->path.size () + 1);  for (size_t i = 0; i < ra->path.size (); i++) {    ca->path[i] = ra->path[i];  }  ca->path[ra->path.size ()] = me;  u_long m = ra->succs_desired;  u_long tofill = (cs.size () < m) ? cs.size () : m;  ca->body.robody->successors.setsize (tofill);  for (size_t i = 0; i < tofill; i++)    cs[i]->fill_node (ca->body.robody->successors[i]);  ca->retries = ra->retries;    ptr<location> l = locations->insert(make_chord_node (ra->origin));  chordID xxx = cs.size () > 0?cs[0]->id ():0;  acctrace << (myID>>144) << ": doaccroute_sendcomplete key " << (ra->x>>144)     << " routeid " << ra->routeid << " desired " << ra->succs_desired    << " rt " << cs.size () <<  " " << (xxx>>144) << " ori " << (l->id ()>>144) <<"\n";  bytes_sent (BYTES_PER_LOOKUPCOMPLETE + (ra->path.size ()+ tofill)* BYTES_PER_ID);  doRPC (l, accordion_program_1, ACCORDIONPROC_LOOKUP_COMPLETE,	 ca, NULL,	 wrap (this, &accordion::accroute_sent_complete_cb, l, ra->x),	 wrap (this, &accordion::accroute_sent_complete_timeout_cb, l, ra->x));}voidaccordion::accroute_sent_complete_cb(ptr<location> l, chordID x, clnt_stat status){  if (status)    acctrace << (myID>>144) << ": accroute_complete lost key "       << (x>>144) << " status " << status << ".\n";}boolaccordion::accroute_sent_complete_timeout_cb(ptr<location> l, chordID x, chord_node n, int rexmit_number){  acctrace << (myID>>144) << ": accroute_sent_complete_timeout_cb key " << (x>>144)    << " rexmit " << rexmit_number << "\n";  return false;}voidaccordion::fill_nodelistres (chord_nodelistres *res, vec<ptr<location> > fs){  res->resok->nlist.setsize (fs.size ());  for (size_t i = 0; i < fs.size (); i++)    fs[i]->fill_node (res->resok->nlist[i]);}voidaccordion::update_bavail (){  if (timenow > 0 && bavail_t > 0)    bavail += (budget_ * (timenow-bavail_t));  bavail_t = timenow;  if (bavail > burst_)     bavail = burst_;}unsigned accordion::get_parallelism (){  update_bavail();  int p = bavail/(BYTES_PER_LOOKUP+BYTES_PER_ID);  if (p <= 0)    return 1;  else     return para_;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -