⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 stp_manager.c

📁 chord 源码 http://pdos.csail.mit.edu/chord/
💻 C
📖 第 1 页 / 共 2 页
字号:
  cwind_cwind.push_back (cwind);  cwind_time.push_back ((getusec () - st)/1000000.0);  if (cwind_cwind.size () > 1000) cwind_cwind.pop_front ();  if (cwind_time.size () > 1000) cwind_time.pop_front ();    /*  warn << "window " << getusec ()/1000 << " " << (int)(cwind*1000) << " " << (int)(ssthresh*1000) << "\n";  */}voidstp_manager::enqueue_rpc (RPC_delay_args *args) {  num_qed++;  qued_hist.push_back (num_qed);  if (qued_hist.size () > 1000) qued_hist.pop_back ();  Q.insert_tail (args);}voidstp_manager::setup_rexmit_timer (ptr<location> from, ptr<location> l, 				 long *sec, long *nsec){#define MIN_SAMPLES 10  float alat;  if (nrpc < 50) {    alat = 1000000;  } else if (false && l &&   //if we've got a measurement, use it      l->nrpc () > 50) {    //      warn << l->id () << ": using " << l->nrpc () << "measurments: " << (int)l->distance () << " + 4*" << (int)l->a_var () << "\n";    alat = 1.5*(l->distance() + 6*l->a_var () + 5000);  } else if (l	     && from	     && (l->coords ().size () > 0)	     && (from->coords ().size () > 0)) {    float dist = Coord::distance_f (from->coords (), l->coords ());    alat = dist + 6.0*c_err + 10*c_var + 15000;     //scale it to be safe. the 8 comes from an analysis for log files    // I also tried using the variance but average works better.     // With 8 we'll do about 1 percent spurious retransmits  } else    alat = 1000000;    //statistics  timers.push_back (alat);  if (timers.size () > 1000) timers.pop_front ();    *sec = (long)(alat / 1000000);  *nsec = ((long)alat % 1000000) * 1000;    if (*nsec < 0 || *sec < 0 || *sec > 1) {    *sec = 1;    *nsec = 0;  }}void stp_manager::stats (){  char buf[1024];  stream_rpcm->stats ();  rpc_manager::stats ();    sprintf(buf, "  Average latency/variance: %f/%f\n", a_lat, a_var);  warnx << buf;  sprintf(buf, "  Average cwind: %f\n", cwind_cum/num_cwind_samples);  if (shortstats) return;  warnx << "Timer history:\n";  for (unsigned int i = 0; i < timers.size (); i++) {    sprintf (buf, "%f", timers[i]);    warnx << "t: " << buf << "\n";  }  warnx << "Latencies:\n";  for (unsigned int i = 0; i < lat_history.size (); i++) {    sprintf (buf, "%f", lat_history[i]);    warnx << "lat: " << buf << "\n";  }  warnx << "Latencies (in q):\n";  for (unsigned int i = 0; i < lat_inq.size (); i++) {    warnx << "lat(q): " << lat_inq[i] << "\n";  }  warnx << "cwind over time:\n";  for (unsigned int i = 0; i < cwind_cwind.size (); i++) {    sprintf (buf, "%f %f", cwind_time[i], cwind_cwind[i]);    warnx << "cw: " << buf << "\n";  }  warnx << "queue length over time:\n";  for (unsigned int i = 0; i < qued_hist.size (); i++) {    sprintf (buf, "%f %ld", cwind_time[i], qued_hist[i]);    warnx << "qued: " << buf << "\n";  }  warnx << "current RPCs queued for transmission pending window: " << num_qed << "\n";  RPC_delay_args *args = Q.first;  //  const rpcgen_table *rtp;  u_int64_t now = getusec ();  while (args) {    void *args_as_pointer = args->in.get ();    int real_prog = ((dorpc_arg *)args_as_pointer)->progno;    int real_procno = ((dorpc_arg *)args_as_pointer)->procno;    long diff = now - args->now;    warn << "   " << real_prog << "." << real_procno << " for " << args->l->id() << " queued for " << diff << "\n";    /*    rtp = &(args->prog.tbl[args->procno]);    if (rtp)       warn << "  " << args->prog.name << "." << rtp->name << " for " << args->l->id() << " queued for " << diff << "\n";    else       warn << "stp_manager::stats: WTF " << (u_int)&args->prog << "@" << args->procno << "\n";    */    args = Q.next (args);  }  warnx << "per program bytes\n";  rpcstats *s = rpc_stats_tab.first ();  while (s) {    warnx << "  " << s->key << "\n";    warnx << "    calls (bytes/num):   " << s->call_bytes	  << "/" << s->ncall << "\n";    warnx << "    rexmits (bytes/num): " << s->rexmit_bytes	  << "/" << s->nrexmit << "\n";    warnx << "    replies (bytes/num): " << s->reply_bytes	  << "/" << s->nreply << "\n";    s = rpc_stats_tab.next (s);  }}// ------------- rpccb_chord ----------------rpccb_chord *rpccb_chord::alloc (ptr<aclnt> c,		    aclnt_cb cb,		    callback<bool>::ptr u_tmo,		    ptr<void> in,		    void *out,		    int procno,		    struct sockaddr *dest) {  xdrsuio x (XDR_ENCODE);  const rpc_program &prog = c->rp;    //re-write the timestamp  dorpc_arg *args = (dorpc_arg *)in.get ();  args->send_time = getusec ();  if (!aclnt::marshal_call (x, authnone_create (), prog.progno, 			    prog.versno, procno, 			    prog.tbl[procno].xdr_arg,			    in)) {    warn << "marshalling failed\n";    return NULL;  }    assert (x.iov ()[0].iov_len >= 4);  u_int32_t &xid = *reinterpret_cast<u_int32_t *> (x.iov ()[0].iov_base);  if (!c->xi->xh->reliable || cb != aclnt_cb_null) {    u_int32_t txid;    while (c->xi->xidtab[txid = (*next_xid) ()]);    xid = txid;  }  // per program/proc RPC stats  suio *s = x.uio ();  track_call (prog, procno, s->resid ());  outbytes += s->resid ();    ptr<bool> deleted  = New refcounted<bool> (false);  rpccb_chord *ret = New rpccb_chord (c,				      x,				      cb,				      u_tmo,				      out,				      prog.tbl[procno].xdr_res,				      dest,				      deleted,				      in,				      procno);    return ret;}voidrpccb_chord::send (long _sec, long _nsec) {  rexmits = 0;  sec = _sec;  nsec = _nsec;  if (nsec < 0 || sec < 0)    panic ("[send to chord-dev@amsterdam.lcs.mit.edu]: sec %ld, nsec %ld\n", sec, nsec);  tmo = delaycb (sec, nsec, wrap (this, &rpccb_chord::timeout_cb, deleted));  //  warn ("%s xmited %d:%06d\n", gettime().cstr(), int (sec), int (nsec/1000));  //  warn << "RPCTIMING: " << getusec () << " sent " << (u_int)outmem << "\n";  xmit (0);}//do the exponential backoff on tmovoidrpccb_chord::reset_tmo (){#ifdef VERBOSE_LOG  long oldsec = sec;  long oldnsec = nsec;#endif /* VERBOSE_LOG */  nsec *= 2;  sec *= 2;  while (nsec >= 1000000000) {    nsec -= 1000000000;    sec += 1;  }  if (sec > 2) sec = 2;#ifdef VERBOSE_LOG  warn << inet_ntoa (((sockaddr_in *)&s)->sin_addr)       << ": timer was " << oldsec << "." << oldnsec       << " now is " << sec << "." << nsec       << "; rexmits is " << rexmits << "\n";#endif /* VERBOSE_LOG */    if (tmo) timecb_remove (tmo);  tmo = delaycb (sec, nsec, wrap (this, &rpccb_chord::timeout_cb, deleted));}voidrpccb_chord::timeout_cb (ptr<bool> del){  if (*del) return;  tmo = NULL;  bool cancel = false;  if (utmo)    cancel = utmo ();  if (rexmits > MAX_REXMIT || cancel) {    timeout ();    return;  } else {    if (nsec < 0 || sec < 0)      panic ("1 timeout_cb: sec %ld, nsec %ld\n", sec, nsec);    sockaddr_in *s = (sockaddr_in *)dest;    dorpc_arg *args = (dorpc_arg *)in.get ();    warnx << gettime () << " REXMIT " << strbuf ("%x", xid)	  << " " << args->progno << ":" << args->procno	  << " rexmits " << rexmits << ", timeout " 	  << sec*1000 + nsec/(1000*1000) << " ms, destined for " 	  << inet_ntoa (s->sin_addr) << " out is " << (u_int)outmem << "\n";    //re-write the timestamp    args->send_time = getusec ();    //remarshall the args    xdrsuio x (XDR_ENCODE);    const rpc_program &prog = c->rp;    if (!aclnt::marshal_call (x, authnone_create (), prog.progno, 			      prog.versno, procno, 			      prog.tbl[procno].xdr_arg,			      in)) {      fatal << "error remarshalling\n";    }    track_rexmit (prog, procno, x.uio()->resid ());    //keep our old xid     assert (x.iov ()[0].iov_len >= 4);    u_int32_t &txid = *reinterpret_cast<u_int32_t *> (x.iov ()[0].iov_base);    txid = xid;    //update the msg buffer so we send with the new timestamp    unsigned int l = x.uio ()->resid ();    assert (l == msglen);    char *newbuf = suio_flatten (x.uio ());    memcpy (msgbuf, newbuf, msglen);    xfree (newbuf);    //send it    xmit (rexmits);            if (rexmits == MAX_REXMIT && sec < MIN_RPC_FAILURE_TIMER) {      sec = MIN_RPC_FAILURE_TIMER;      nsec = 0;    }     reset_tmo ();    rexmits++;  }}voidrpccb_chord::finish_cb (aclnt_cb cb, ptr<bool> del, clnt_stat err) {  if (*del) return;  *del = true;  cb (err);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -