📄 stp_manager.c
字号:
#include <crypt.h>#include <chord_prot.h>#include <misc_utils.h>#include "comm.h"#include <location.h>#include "modlogger.h"#include "coord.h"#include <transport_prot.h>long outbytes;const int shortstats (getenv ("SHORT_STATS") ? 1 : 0);#define CWIND_MULT 5stp_manager::stp_manager (ptr<u_int32_t> _nrcv) : rpc_manager (_nrcv), seqno (0), cwind (1.0), cwind_ewma (1.0), ssthresh (6.0), cwind_cum (0.0), num_cwind_samples (0), num_qed (0), inflight (0), idle_timer (NULL){ delaycb (1, 0, wrap (this, &stp_manager::ratecb)); reset_idle_timer (); st = getusec (); stream_rpcm = New refcounted<tcp_manager> (nrcv);}stp_manager::~stp_manager (){ if (idle_timer) timecb_remove (idle_timer);}voidstp_manager::ratecb () {#ifdef VERBOSE_LOG warnx << "sent " << nsent << " RPCs in the last second\n"; warnx << "received " << *nrcv << " RPCs in the last second\n"; warnx << npending << " RPCs are outstanding\n";#endif // do something if nsent (+ nrcv) is too high xxx? delaycb (1, 0, wrap (this, &stp_manager::ratecb)); nsent = outbytes; *nrcv = 0;}longstp_manager::doRPC_stream (ptr<location> from, ptr<location> l, const rpc_program &prog, int procno, ptr<void> in, void *out, aclnt_cb cb){ return stream_rpcm->doRPC (from, l, prog, procno, in, out, cb, NULL);}longstp_manager::doRPC_dead (ptr<location> l, const rpc_program &prog, int procno, ptr<void> in, void *out, aclnt_cb cb){#ifdef VERBOSE_LOG modlogger ("stp_manager") << "dead_rpc " << l->id () << " " << l->address () << "\n";#endif /* VERBOSE_LOG */ ref<aclnt> c = aclnt::alloc (dgram_xprt, prog, (sockaddr *)&(l->saddr ())); c->call (procno, in, out, cb); return 0;}boolstp_manager::room_in_window () { return inflight < cwind*CWIND_MULT;}longstp_manager::doRPC (ptr<location> from, ptr<location> l, const rpc_program &prog, int procno, ptr<void> in, void *out, aclnt_cb cb, cbtmo_t cb_tmo){ reset_idle_timer (); if (!room_in_window ()) { RPC_delay_args *args = New RPC_delay_args (from, l, prog, procno, in, out, cb, cb_tmo); enqueue_rpc (args); return 0; } else { ref<aclnt> c = aclnt::alloc (dgram_xprt, prog, (sockaddr *)&(l->saddr ())); rpc_state *C = New rpc_state (from, l, cb, cb_tmo, seqno, prog.progno, out); C->procno = procno; C->b = rpccb_chord::alloc (c, wrap (this, &stp_manager::doRPCcb, c, C), wrap (this, &stp_manager::timeout, C), in, out, procno, (sockaddr *)&(l->saddr ())); long sec, nsec; setup_rexmit_timer (from, l, &sec, &nsec); //insert into the Q of RPCs in flight pending.insert_tail (C); inflight++; C->sendtime = getusec (); C->b->send (sec, nsec); nsent++; return seqno++; }}boolstp_manager::timeout (rpc_state *C){ //run through the list of pending RPCs and // remove any that are headed for the host // that just timed out from the window. // if the host is dead, this prevents a bunch // of back-to-back RPCs for that host from // clogging up the window // if this was a congestion loss, we've // increased the window that we "deserve" rpc_state *O = pending.first; while (O) { if (O->loc->id () == C->loc->id () && O != C && O->in_window) { O->in_window = false; inflight--; } O = pending.next (O); } // multiple retransmissions is a good sign the node is dead // don't hold up the window waiting for him to time out if (C->rexmits > 1 && C->in_window) { C->in_window = false; inflight--; } //if there are any RPCs destined for this host, make // them fail right away. if (C->rexmits > MAX_REXMIT) { O = pending.first; while (O) { if (O->loc->id () == C->loc->id () && O != C) O->b->timeout (); O = pending.next (O); } } bool cancel = false; if (C->cb_tmo) { chord_node n; C->loc->fill_node (n); cancel = (C->cb_tmo)(n, C->rexmits); } C->rexmits++; if (C->from->id () != C->loc->id () && C->rexmits == 1) update_cwind (-1); return cancel;}voidstp_manager::doRPCcb (ref<aclnt> c, rpc_state *C, clnt_stat err){ dorpc_res *res = (dorpc_res *)C->out; // warn << "RPCTIMING: " << getusec () << " out = " << (u_int)C->out << " returned\n"; if (err) { nrpcfailed++; C->loc->set_alive (false); warnx << gettime () << " RPC failure: " << err << " destined for " << C->ID << " at " << inet_ntoa (C->loc->saddr().sin_addr) << " seqno " << C->seqno << " out " << (u_int) C->out << "\n"; } else if (res->status == DORPC_MARSHALLERR) { nrpcfailed++; err = RPC_CANTDECODEARGS; warnx << gettime () << " RPC Failure: DORPC_MARSHALLERR for " << C->ID << "\n"; } else if (res->status == DORPC_UNKNOWNNODE) { nrpcfailed++; C->loc->set_alive (false); err = RPC_SYSTEMERROR; warnx << gettime () << " RPC Failure: DORPC_UNKNOWNNODE for " << C->ID << "\n"; } else if (res->status == DORPC_NOHANDLER) { nrpcfailed++; err = RPC_PROGUNAVAIL; warnx << gettime () << " RPC Failure: DORPC_NOHANDLER for " << C->ID << "\n"; } else { assert (res->status == DORPC_OK); u_int64_t sent_time = res->resok->send_time_echo; u_int64_t now = getusec (); // prevent overflow, caused by time reversal if (now >= sent_time) { u_int64_t lat = now - sent_time; update_latency (C->from, C->loc, lat); } } pending.remove (C); (C->cb) (err); if (C->in_window) { inflight--; } update_cwind (C->seqno); rpc_done (C->seqno); delete C;}voidstp_manager::rpc_done (long acked_seqno){ if (Q.first && room_in_window ()) { int qsize = (num_qed > 100) ? 100 : num_qed; int next = (int)(qsize*((float)random()/(float)RAND_MAX)); RPC_delay_args *next_arg = Q.first; for (int i = 0; (next_arg) && (i < next); i++) next_arg = Q.next (next_arg); //if there is an earlier RPC bound for the same destination, send that //this preserves our use of data-driven retransmissions RPC_delay_args *args = Q.first; while ((args != next_arg) && (args->l->id () != next_arg->l->id ())) args = Q.next (args); //stats u_int64_t now = getusec (); u_int64_t diff = now - args->now; lat_inq.push_back (diff); if (lat_inq.size () > 1000) lat_inq.pop_back (); assert (args); Q.remove (args); doRPC (args->from, args->l, args->prog, args->procno, args->in, args->out, args->cb, args->cb_tmo); delete args; num_qed--; } if (Q.first && room_in_window () ) { delaycb (0, 1000000, wrap (this, &stp_manager::rpc_done, acked_seqno)); }}voidstp_manager::reset_idle_timer (){ if (idle_timer) timecb_remove (idle_timer); idle_timer = delaycb (5, 0, wrap (this, &stp_manager::idle));}voidstp_manager::idle () { cwind = 1.0; cwind_ewma = 1.0; ssthresh = 6; idle_timer = NULL;}voidstp_manager::update_cwind (int seq) { if (seq >= 0) { if (cwind < ssthresh) cwind += 1.0; //slow start else cwind += 1.0/cwind; //AI } else { ssthresh = cwind_ewma/2; // MD if (ssthresh < 1.0) ssthresh = 1.0; // cwind = 1.0; cwind = cwind / 2.0; if (cwind < 1.0) cwind = 1.0; } cwind_ewma = (cwind_ewma*49 + cwind)/50; cwind_cum += cwind; num_cwind_samples++; //stats
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -