⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 stp_manager.c

📁 chord 源码 http://pdos.csail.mit.edu/chord/
💻 C
📖 第 1 页 / 共 2 页
字号:
#include <crypt.h>#include <chord_prot.h>#include <misc_utils.h>#include "comm.h"#include <location.h>#include "modlogger.h"#include "coord.h"#include <transport_prot.h>long outbytes;const int shortstats (getenv ("SHORT_STATS") ? 1 : 0);#define CWIND_MULT 5stp_manager::stp_manager (ptr<u_int32_t> _nrcv)  : rpc_manager (_nrcv),    seqno (0),    cwind (1.0),    cwind_ewma (1.0),    ssthresh (6.0),    cwind_cum (0.0),    num_cwind_samples (0),    num_qed (0),    inflight (0),    idle_timer (NULL){  delaycb (1, 0, wrap (this, &stp_manager::ratecb));  reset_idle_timer ();  st = getusec ();  stream_rpcm = New refcounted<tcp_manager> (nrcv);}stp_manager::~stp_manager (){  if (idle_timer)    timecb_remove (idle_timer);}voidstp_manager::ratecb () {#ifdef VERBOSE_LOG  warnx << "sent " << nsent << " RPCs in the last second\n";  warnx << "received " << *nrcv << " RPCs in the last second\n";  warnx << npending << " RPCs are outstanding\n";#endif  // do something if nsent (+ nrcv) is too high xxx?  delaycb (1, 0, wrap (this, &stp_manager::ratecb));  nsent = outbytes;  *nrcv = 0;}longstp_manager::doRPC_stream (ptr<location> from, ptr<location> l,			   const rpc_program &prog, int procno, 			   ptr<void> in, void *out, aclnt_cb cb){  return stream_rpcm->doRPC (from, l, prog, procno, in, out, cb, NULL);}longstp_manager::doRPC_dead (ptr<location> l,			 const rpc_program &prog, int procno, 			 ptr<void> in, void *out, aclnt_cb cb){#ifdef VERBOSE_LOG    modlogger ("stp_manager") << "dead_rpc "			    << l->id () << " " << l->address () << "\n";#endif /* VERBOSE_LOG */    ref<aclnt> c = aclnt::alloc (dgram_xprt, prog, 			       (sockaddr *)&(l->saddr ()));    c->call (procno, in, out, cb);   return 0;}boolstp_manager::room_in_window () {  return inflight < cwind*CWIND_MULT;}longstp_manager::doRPC (ptr<location> from, ptr<location> l,		    const rpc_program &prog, int procno, 		    ptr<void> in, void *out, aclnt_cb cb,		    cbtmo_t cb_tmo){  reset_idle_timer ();  if (!room_in_window ()) {    RPC_delay_args *args = New RPC_delay_args (from, l, prog, procno,					       in, out, cb, cb_tmo);    enqueue_rpc (args);    return 0;  } else {    ref<aclnt> c = aclnt::alloc (dgram_xprt, prog, 				 (sockaddr *)&(l->saddr ()));    rpc_state *C = New rpc_state (from, l, cb, cb_tmo, seqno, 				  prog.progno, out);    C->procno = procno;       C->b = rpccb_chord::alloc (c, 			       wrap (this, &stp_manager::doRPCcb, c, C),			       wrap (this, &stp_manager::timeout, C),			       in,			       out,			       procno, 			       (sockaddr *)&(l->saddr ()));        long sec, nsec;    setup_rexmit_timer (from, l, &sec, &nsec);    //insert into the Q of RPCs in flight    pending.insert_tail (C);    inflight++;    C->sendtime = getusec ();    C->b->send (sec, nsec);    nsent++;        return seqno++;  }}boolstp_manager::timeout (rpc_state *C){  //run through the list of pending RPCs and   // remove any that are headed for the host  // that just timed out from the window.  // if the host is dead, this prevents a bunch  // of back-to-back RPCs for that host from  // clogging up the window  // if this was a congestion loss, we've   // increased the window that we "deserve"  rpc_state *O = pending.first;  while (O) {    if (O->loc->id () == C->loc->id () &&	O != C &&	O->in_window) {      O->in_window = false;      inflight--;    }    O = pending.next (O);  }  // multiple retransmissions is a good sign the node is dead  // don't hold up the window waiting for him to time out  if (C->rexmits > 1 && C->in_window) {    C->in_window = false;    inflight--;  }  //if there are any RPCs destined for this host, make  // them fail right away.   if (C->rexmits > MAX_REXMIT) {    O = pending.first;    while (O) {      if (O->loc->id () == C->loc->id () && O != C) 	O->b->timeout ();      O = pending.next (O);    }  }    bool cancel = false;  if (C->cb_tmo) {    chord_node n;    C->loc->fill_node (n);    cancel = (C->cb_tmo)(n, C->rexmits);  }  C->rexmits++;  if (C->from->id () != C->loc->id () && C->rexmits == 1)    update_cwind (-1);  return cancel;}voidstp_manager::doRPCcb (ref<aclnt> c, rpc_state *C, clnt_stat err){  dorpc_res *res = (dorpc_res *)C->out;    //  warn << "RPCTIMING: " << getusec () << " out = " << (u_int)C->out << " returned\n";  if (err) {    nrpcfailed++;    C->loc->set_alive (false);    warnx << gettime () << " RPC failure: " << err          << " destined for " << C->ID	  << " at " << inet_ntoa (C->loc->saddr().sin_addr)	  << " seqno " << C->seqno	  << " out " << (u_int) C->out	  << "\n";  } else if (res->status == DORPC_MARSHALLERR) {    nrpcfailed++;    err = RPC_CANTDECODEARGS;    warnx << gettime () << " RPC Failure: DORPC_MARSHALLERR for " << C->ID           << "\n";  } else if (res->status == DORPC_UNKNOWNNODE) {    nrpcfailed++;    C->loc->set_alive (false);    err = RPC_SYSTEMERROR;    warnx << gettime () << " RPC Failure: DORPC_UNKNOWNNODE for " << C->ID           << "\n";  } else if (res->status == DORPC_NOHANDLER) {    nrpcfailed++;    err = RPC_PROGUNAVAIL;    warnx << gettime () << " RPC Failure: DORPC_NOHANDLER for " << C->ID	  << "\n";  } else {    assert (res->status == DORPC_OK);    u_int64_t sent_time = res->resok->send_time_echo;    u_int64_t now = getusec ();    // prevent overflow, caused by time reversal    if (now >= sent_time) {      u_int64_t lat = now - sent_time;      update_latency (C->from, C->loc, lat);    }   }    pending.remove (C);  (C->cb) (err);  if (C->in_window) {    inflight--;  }  update_cwind (C->seqno);  rpc_done (C->seqno);  delete C;}voidstp_manager::rpc_done (long acked_seqno){  if (Q.first && room_in_window ()) {    int qsize = (num_qed > 100) ? 100 :  num_qed;    int next = (int)(qsize*((float)random()/(float)RAND_MAX));    RPC_delay_args *next_arg = Q.first;    for (int i = 0; (next_arg) && (i < next); i++)      next_arg = Q.next (next_arg);    //if there is an earlier RPC bound for the same destination, send that    //this preserves our use of data-driven retransmissions    RPC_delay_args *args = Q.first;    while ((args != next_arg) && (args->l->id () != next_arg->l->id ()))      args = Q.next (args);    //stats    u_int64_t now = getusec ();    u_int64_t diff = now - args->now;    lat_inq.push_back (diff);    if (lat_inq.size () > 1000) lat_inq.pop_back ();    assert (args);    Q.remove (args);    doRPC (args->from, args->l, args->prog,	   args->procno, 	   args->in,	   args->out,	   args->cb,	   args->cb_tmo);    delete args;    num_qed--;  }  if (Q.first && room_in_window () ) {    delaycb (0, 1000000, wrap (this, &stp_manager::rpc_done, acked_seqno));  }}voidstp_manager::reset_idle_timer (){  if (idle_timer) timecb_remove (idle_timer);  idle_timer = delaycb (5, 0, wrap (this, &stp_manager::idle));}voidstp_manager::idle () {  cwind = 1.0;  cwind_ewma = 1.0;  ssthresh = 6;  idle_timer = NULL;}voidstp_manager::update_cwind (int seq) {  if (seq >= 0) {    if (cwind < ssthresh)       cwind += 1.0; //slow start    else      cwind += 1.0/cwind; //AI  } else {    ssthresh = cwind_ewma/2; // MD    if (ssthresh < 1.0) ssthresh = 1.0;    //    cwind = 1.0;    cwind = cwind / 2.0;    if (cwind < 1.0) cwind = 1.0;  }  cwind_ewma = (cwind_ewma*49 + cwind)/50;  cwind_cum += cwind;  num_cwind_samples++;    //stats

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -