📄 stp_manager.c
字号:
cwind_cwind.push_back (cwind); cwind_time.push_back ((getusec () - st)/1000000.0); if (cwind_cwind.size () > 1000) cwind_cwind.pop_front (); if (cwind_time.size () > 1000) cwind_time.pop_front (); /* warn << "window " << getusec ()/1000 << " " << (int)(cwind*1000) << " " << (int)(ssthresh*1000) << "\n"; */}voidstp_manager::enqueue_rpc (RPC_delay_args *args) { num_qed++; qued_hist.push_back (num_qed); if (qued_hist.size () > 1000) qued_hist.pop_back (); Q.insert_tail (args);}voidstp_manager::setup_rexmit_timer (ptr<location> from, ptr<location> l, long *sec, long *nsec){#define MIN_SAMPLES 10 float alat; if (nrpc < 50) { alat = 1000000; } else if (false && l && //if we've got a measurement, use it l->nrpc () > 50) { // warn << l->id () << ": using " << l->nrpc () << "measurments: " << (int)l->distance () << " + 4*" << (int)l->a_var () << "\n"; alat = 1.5*(l->distance() + 6*l->a_var () + 5000); } else if (l && from && (l->coords ().size () > 0) && (from->coords ().size () > 0)) { float dist = Coord::distance_f (from->coords (), l->coords ()); alat = dist + 6.0*c_err + 10*c_var + 15000; //scale it to be safe. the 8 comes from an analysis for log files // I also tried using the variance but average works better. // With 8 we'll do about 1 percent spurious retransmits } else alat = 1000000; //statistics timers.push_back (alat); if (timers.size () > 1000) timers.pop_front (); *sec = (long)(alat / 1000000); *nsec = ((long)alat % 1000000) * 1000; if (*nsec < 0 || *sec < 0 || *sec > 1) { *sec = 1; *nsec = 0; }}void stp_manager::stats (){ char buf[1024]; stream_rpcm->stats (); rpc_manager::stats (); sprintf(buf, " Average latency/variance: %f/%f\n", a_lat, a_var); warnx << buf; sprintf(buf, " Average cwind: %f\n", cwind_cum/num_cwind_samples); if (shortstats) return; warnx << "Timer history:\n"; for (unsigned int i = 0; i < timers.size (); i++) { sprintf (buf, "%f", timers[i]); warnx << "t: " << buf << "\n"; } warnx << "Latencies:\n"; for (unsigned int i = 0; i < lat_history.size (); i++) { sprintf (buf, "%f", lat_history[i]); warnx << "lat: " << buf << "\n"; } warnx << "Latencies (in q):\n"; for (unsigned int i = 0; i < lat_inq.size (); i++) { warnx << "lat(q): " << lat_inq[i] << "\n"; } warnx << "cwind over time:\n"; for (unsigned int i = 0; i < cwind_cwind.size (); i++) { sprintf (buf, "%f %f", cwind_time[i], cwind_cwind[i]); warnx << "cw: " << buf << "\n"; } warnx << "queue length over time:\n"; for (unsigned int i = 0; i < qued_hist.size (); i++) { sprintf (buf, "%f %ld", cwind_time[i], qued_hist[i]); warnx << "qued: " << buf << "\n"; } warnx << "current RPCs queued for transmission pending window: " << num_qed << "\n"; RPC_delay_args *args = Q.first; // const rpcgen_table *rtp; u_int64_t now = getusec (); while (args) { void *args_as_pointer = args->in.get (); int real_prog = ((dorpc_arg *)args_as_pointer)->progno; int real_procno = ((dorpc_arg *)args_as_pointer)->procno; long diff = now - args->now; warn << " " << real_prog << "." << real_procno << " for " << args->l->id() << " queued for " << diff << "\n"; /* rtp = &(args->prog.tbl[args->procno]); if (rtp) warn << " " << args->prog.name << "." << rtp->name << " for " << args->l->id() << " queued for " << diff << "\n"; else warn << "stp_manager::stats: WTF " << (u_int)&args->prog << "@" << args->procno << "\n"; */ args = Q.next (args); } warnx << "per program bytes\n"; rpcstats *s = rpc_stats_tab.first (); while (s) { warnx << " " << s->key << "\n"; warnx << " calls (bytes/num): " << s->call_bytes << "/" << s->ncall << "\n"; warnx << " rexmits (bytes/num): " << s->rexmit_bytes << "/" << s->nrexmit << "\n"; warnx << " replies (bytes/num): " << s->reply_bytes << "/" << s->nreply << "\n"; s = rpc_stats_tab.next (s); }}// ------------- rpccb_chord ----------------rpccb_chord *rpccb_chord::alloc (ptr<aclnt> c, aclnt_cb cb, callback<bool>::ptr u_tmo, ptr<void> in, void *out, int procno, struct sockaddr *dest) { xdrsuio x (XDR_ENCODE); const rpc_program &prog = c->rp; //re-write the timestamp dorpc_arg *args = (dorpc_arg *)in.get (); args->send_time = getusec (); if (!aclnt::marshal_call (x, authnone_create (), prog.progno, prog.versno, procno, prog.tbl[procno].xdr_arg, in)) { warn << "marshalling failed\n"; return NULL; } assert (x.iov ()[0].iov_len >= 4); u_int32_t &xid = *reinterpret_cast<u_int32_t *> (x.iov ()[0].iov_base); if (!c->xi->xh->reliable || cb != aclnt_cb_null) { u_int32_t txid; while (c->xi->xidtab[txid = (*next_xid) ()]); xid = txid; } // per program/proc RPC stats suio *s = x.uio (); track_call (prog, procno, s->resid ()); outbytes += s->resid (); ptr<bool> deleted = New refcounted<bool> (false); rpccb_chord *ret = New rpccb_chord (c, x, cb, u_tmo, out, prog.tbl[procno].xdr_res, dest, deleted, in, procno); return ret;}voidrpccb_chord::send (long _sec, long _nsec) { rexmits = 0; sec = _sec; nsec = _nsec; if (nsec < 0 || sec < 0) panic ("[send to chord-dev@amsterdam.lcs.mit.edu]: sec %ld, nsec %ld\n", sec, nsec); tmo = delaycb (sec, nsec, wrap (this, &rpccb_chord::timeout_cb, deleted)); // warn ("%s xmited %d:%06d\n", gettime().cstr(), int (sec), int (nsec/1000)); // warn << "RPCTIMING: " << getusec () << " sent " << (u_int)outmem << "\n"; xmit (0);}//do the exponential backoff on tmovoidrpccb_chord::reset_tmo (){#ifdef VERBOSE_LOG long oldsec = sec; long oldnsec = nsec;#endif /* VERBOSE_LOG */ nsec *= 2; sec *= 2; while (nsec >= 1000000000) { nsec -= 1000000000; sec += 1; } if (sec > 2) sec = 2;#ifdef VERBOSE_LOG warn << inet_ntoa (((sockaddr_in *)&s)->sin_addr) << ": timer was " << oldsec << "." << oldnsec << " now is " << sec << "." << nsec << "; rexmits is " << rexmits << "\n";#endif /* VERBOSE_LOG */ if (tmo) timecb_remove (tmo); tmo = delaycb (sec, nsec, wrap (this, &rpccb_chord::timeout_cb, deleted));}voidrpccb_chord::timeout_cb (ptr<bool> del){ if (*del) return; tmo = NULL; bool cancel = false; if (utmo) cancel = utmo (); if (rexmits > MAX_REXMIT || cancel) { timeout (); return; } else { if (nsec < 0 || sec < 0) panic ("1 timeout_cb: sec %ld, nsec %ld\n", sec, nsec); sockaddr_in *s = (sockaddr_in *)dest; dorpc_arg *args = (dorpc_arg *)in.get (); warnx << gettime () << " REXMIT " << strbuf ("%x", xid) << " " << args->progno << ":" << args->procno << " rexmits " << rexmits << ", timeout " << sec*1000 + nsec/(1000*1000) << " ms, destined for " << inet_ntoa (s->sin_addr) << " out is " << (u_int)outmem << "\n"; //re-write the timestamp args->send_time = getusec (); //remarshall the args xdrsuio x (XDR_ENCODE); const rpc_program &prog = c->rp; if (!aclnt::marshal_call (x, authnone_create (), prog.progno, prog.versno, procno, prog.tbl[procno].xdr_arg, in)) { fatal << "error remarshalling\n"; } track_rexmit (prog, procno, x.uio()->resid ()); //keep our old xid assert (x.iov ()[0].iov_len >= 4); u_int32_t &txid = *reinterpret_cast<u_int32_t *> (x.iov ()[0].iov_base); txid = xid; //update the msg buffer so we send with the new timestamp unsigned int l = x.uio ()->resid (); assert (l == msglen); char *newbuf = suio_flatten (x.uio ()); memcpy (msgbuf, newbuf, msglen); xfree (newbuf); //send it xmit (rexmits); if (rexmits == MAX_REXMIT && sec < MIN_RPC_FAILURE_TIMER) { sec = MIN_RPC_FAILURE_TIMER; nsec = 0; } reset_tmo (); rexmits++; }}voidrpccb_chord::finish_cb (aclnt_cb cb, ptr<bool> del, clnt_stat err) { if (*del) return; *del = true; cb (err);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -